This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 89262460 feat: implement delete file index (#435)
89262460 is described below
commit 89262460471b4ef8767d2f1518f37841d7f550a2
Author: Gang Wu <[email protected]>
AuthorDate: Mon Dec 29 13:38:38 2025 +0800
feat: implement delete file index (#435)
Implemented the DeleteFileIndex and Builder to manage and efficiently
filter delete files (equality deletes, position deletes, and deletion
vectors)
based on sequence numbers and partitions.
---
src/iceberg/CMakeLists.txt | 2 +
src/iceberg/delete_file_index.cc | 776 +++++++++++++++
src/iceberg/delete_file_index.h | 398 ++++++++
src/iceberg/manifest/manifest_reader.cc | 41 +-
src/iceberg/manifest/manifest_reader.h | 9 +
src/iceberg/manifest/manifest_reader_internal.h | 3 +
src/iceberg/meson.build | 3 +
src/iceberg/metadata_columns.h | 41 +-
src/iceberg/test/CMakeLists.txt | 15 +-
src/iceberg/test/delete_file_index_test.cc | 1190 +++++++++++++++++++++++
src/iceberg/test/manifest_reader_test.cc | 65 ++
src/iceberg/util/content_file_util.cc | 123 +++
src/iceberg/util/content_file_util.h | 63 ++
src/iceberg/util/meson.build | 1 +
14 files changed, 2706 insertions(+), 24 deletions(-)
diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index 519757d2..27892935 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -20,6 +20,7 @@ set(ICEBERG_INCLUDES
"$<BUILD_INTERFACE:${PROJECT_BINARY_DIR}/src>"
set(ICEBERG_SOURCES
arrow_c_data_guard_internal.cc
catalog/memory/in_memory_catalog.cc
+ delete_file_index.cc
expression/aggregate.cc
expression/binder.cc
expression/evaluator.cc
@@ -80,6 +81,7 @@ set(ICEBERG_SOURCES
update/update_properties.cc
update/update_sort_order.cc
util/bucket_util.cc
+ util/content_file_util.cc
util/conversions.cc
util/decimal.cc
util/gzip_internal.cc
diff --git a/src/iceberg/delete_file_index.cc b/src/iceberg/delete_file_index.cc
new file mode 100644
index 00000000..2a96e9d3
--- /dev/null
+++ b/src/iceberg/delete_file_index.cc
@@ -0,0 +1,776 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/delete_file_index.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <iterator>
+#include <ranges>
+#include <vector>
+
+#include "iceberg/expression/expression.h"
+#include "iceberg/expression/manifest_evaluator.h"
+#include "iceberg/expression/projections.h"
+#include "iceberg/file_io.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/metadata_columns.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/schema.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/content_file_util.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+namespace internal {
+
+Status EqualityDeleteFile::ConvertBoundsIfNeeded() const {
+ if (bounds_converted) {
+ return {};
+ }
+
+ // Convert bounds for equality field IDs only
+ for (int32_t field_id : wrapped.data_file->equality_ids) {
+ ICEBERG_ASSIGN_OR_RAISE(auto field, schema->FindFieldById(field_id));
+ if (!field.has_value()) {
+ continue;
+ }
+
+ const auto& schema_field = field.value().get();
+ if (schema_field.type()->is_nested()) {
+ continue;
+ }
+
+ const auto primitive_type =
checked_pointer_cast<PrimitiveType>(schema_field.type());
+
+ // Convert lower bound
+ if (auto it = wrapped.data_file->lower_bounds.find(field_id);
+ it != wrapped.data_file->lower_bounds.cend() && !it->second.empty()) {
+ ICEBERG_ASSIGN_OR_RAISE(auto lower,
+ Literal::Deserialize(it->second,
primitive_type));
+ lower_bounds.emplace(field_id, std::move(lower));
+ }
+
+ // Convert upper bound
+ if (auto it = wrapped.data_file->upper_bounds.find(field_id);
+ it != wrapped.data_file->upper_bounds.cend() && !it->second.empty()) {
+ ICEBERG_ASSIGN_OR_RAISE(auto upper,
+ Literal::Deserialize(it->second,
primitive_type));
+ upper_bounds.emplace(field_id, std::move(upper));
+ }
+ }
+
+ bounds_converted = true;
+ return {};
+}
+
+// Check if an equality delete file can contain deletes for a data file.
+Result<bool> CanContainEqDeletesForFile(const DataFile& data_file,
+ const EqualityDeleteFile& delete_file)
{
+ // Whether to check data ranges or to assume that the ranges match. If
upper/lower
+ // bounds are missing, null counts may still be used to determine delete
files can be
+ // skipped.
+ bool check_ranges = !data_file.lower_bounds.empty() &&
+ !data_file.upper_bounds.empty() &&
+ delete_file.HasLowerAndUpperBounds();
+
+ const auto* wrapped_delete_file = delete_file.wrapped.data_file.get();
+
+ for (int32_t field_id : wrapped_delete_file->equality_ids) {
+ ICEBERG_ASSIGN_OR_RAISE(auto found_field,
+ delete_file.schema->FindFieldById(field_id));
+ if (!found_field.has_value()) {
+ continue;
+ }
+
+ const auto& field = found_field.value().get();
+ if (field.type()->is_nested()) {
+ continue;
+ }
+
+ bool is_required = !field.optional();
+ bool data_contains_null =
+ ContainsNull(data_file.null_value_counts, field_id, is_required);
+ bool delete_contains_null =
+ ContainsNull(wrapped_delete_file->null_value_counts, field_id,
is_required);
+
+ if (data_contains_null && delete_contains_null) {
+ // Both have nulls - delete may apply
+ continue;
+ }
+
+ if (AllNull(data_file.null_value_counts, data_file.value_counts, field_id,
+ is_required) &&
+ AllNonNull(wrapped_delete_file->null_value_counts, field_id,
is_required)) {
+ return false; // Data is all null, delete has no nulls - cannot match
+ }
+
+ if (AllNull(wrapped_delete_file->null_value_counts,
wrapped_delete_file->value_counts,
+ field_id, is_required) &&
+ AllNonNull(data_file.null_value_counts, field_id, is_required)) {
+ return false; // Delete is all null, data has no nulls - cannot match
+ }
+
+ if (!check_ranges) {
+ continue;
+ }
+
+ // Check range overlap
+ auto data_lower_it = data_file.lower_bounds.find(field_id);
+ auto data_upper_it = data_file.upper_bounds.find(field_id);
+ if (data_lower_it == data_file.lower_bounds.cend() ||
data_lower_it->second.empty() ||
+ data_upper_it == data_file.upper_bounds.cend() ||
data_upper_it->second.empty()) {
+ continue; // Missing bounds, assume may match
+ }
+
+ auto delete_lower = delete_file.LowerBound(field_id);
+ auto delete_upper = delete_file.UpperBound(field_id);
+ if (!delete_lower.has_value() || !delete_upper.has_value()) {
+ continue; // Missing bounds, assume may match
+ }
+
+ // Convert data bounds
+ auto primitive_type = checked_pointer_cast<PrimitiveType>(field.type());
+ ICEBERG_ASSIGN_OR_RAISE(auto data_lower,
+ Literal::Deserialize(data_lower_it->second,
primitive_type));
+ ICEBERG_ASSIGN_OR_RAISE(auto data_upper,
+ Literal::Deserialize(data_upper_it->second,
primitive_type));
+
+ if (!RangesOverlap(data_lower, data_upper, delete_lower->value().get(),
+ delete_upper->value().get())) {
+ return false; // Ranges don't overlap - cannot match
+ }
+ }
+
+ return true;
+}
+
+// PositionDeletes implementation
+
+Status PositionDeletes::Add(ManifestEntry&& entry) {
+ ICEBERG_PRECHECK(entry.sequence_number.has_value(),
+ "Missing sequence number for position delete: {}",
+ entry.data_file->file_path);
+ files_.emplace_back(std::move(entry));
+ indexed_ = false;
+ return {};
+}
+
+std::vector<std::shared_ptr<DataFile>> PositionDeletes::Filter(int64_t seq) {
+ IndexIfNeeded();
+
+ auto iter = std::ranges::lower_bound(seqs_, seq);
+ if (iter == seqs_.end()) {
+ return {};
+ }
+ return files_ | std::views::drop(iter - seqs_.begin()) |
+ std::views::transform(&ManifestEntry::data_file) |
+ std::ranges::to<std::vector<std::shared_ptr<DataFile>>>();
+}
+
+std::vector<std::shared_ptr<DataFile>>
PositionDeletes::ReferencedDeleteFiles() {
+ IndexIfNeeded();
+ return files_ | std::views::transform(&ManifestEntry::data_file) |
+ std::ranges::to<std::vector<std::shared_ptr<DataFile>>>();
+}
+
+void PositionDeletes::IndexIfNeeded() {
+ if (indexed_) {
+ return;
+ }
+
+ // Sort by data sequence number
+ std::ranges::sort(files_, std::ranges::less{},
&ManifestEntry::sequence_number);
+
+ // Build sequence number array for binary search
+ seqs_ = files_ |
+ std::views::transform([](const auto& e) { return
e.sequence_number.value(); }) |
+ std::ranges::to<std::vector<int64_t>>();
+
+ indexed_ = true;
+}
+
+// EqualityDeletes implementation
+
+Status EqualityDeletes::Add(ManifestEntry&& entry) {
+ ICEBERG_PRECHECK(entry.sequence_number.has_value(),
+ "Missing sequence number for equality delete: {}",
+ entry.data_file->file_path);
+ files_.emplace_back(&schema_, std::move(entry));
+ indexed_ = false;
+ return {};
+}
+
+Result<std::vector<std::shared_ptr<DataFile>>> EqualityDeletes::Filter(
+ int64_t seq, const DataFile& data_file) {
+ IndexIfNeeded();
+
+ auto iter = std::ranges::lower_bound(seqs_, seq);
+ if (iter == seqs_.end()) {
+ return {};
+ }
+ std::vector<std::shared_ptr<DataFile>> result;
+ result.reserve(seqs_.end() - iter);
+ for (auto& delete_file : files_ | std::views::drop(iter - seqs_.begin())) {
+ ICEBERG_ASSIGN_OR_RAISE(bool may_contain,
+ CanContainEqDeletesForFile(data_file,
delete_file));
+ if (may_contain) {
+ result.push_back(delete_file.wrapped.data_file);
+ }
+ }
+
+ return result;
+}
+
+std::vector<std::shared_ptr<DataFile>>
EqualityDeletes::ReferencedDeleteFiles() {
+ IndexIfNeeded();
+ return files_ |
+ std::views::transform([](const auto& f) { return f.wrapped.data_file;
}) |
+ std::ranges::to<std::vector<std::shared_ptr<DataFile>>>();
+}
+
+void EqualityDeletes::IndexIfNeeded() {
+ if (indexed_) {
+ return;
+ }
+
+ // Sort by apply sequence number
+ std::ranges::sort(files_, std::ranges::less{},
+ &EqualityDeleteFile::apply_sequence_number);
+
+ // Build sequence number array for binary search
+ seqs_ = files_ |
std::views::transform(&EqualityDeleteFile::apply_sequence_number) |
+ std::ranges::to<std::vector<int64_t>>();
+
+ indexed_ = true;
+}
+
+} // namespace internal
+
+// DeleteFileIndex implementation
+
+DeleteFileIndex::DeleteFileIndex(
+ std::unique_ptr<internal::EqualityDeletes> global_deletes,
+ std::unique_ptr<PartitionMap<std::unique_ptr<internal::EqualityDeletes>>>
+ eq_deletes_by_partition,
+ std::unique_ptr<PartitionMap<std::unique_ptr<internal::PositionDeletes>>>
+ pos_deletes_by_partition,
+ std::unique_ptr<
+ std::unordered_map<std::string,
std::unique_ptr<internal::PositionDeletes>>>
+ pos_deletes_by_path,
+ std::unique_ptr<std::unordered_map<std::string, ManifestEntry>> dv_by_path)
+ : global_deletes_(std::move(global_deletes)),
+ eq_deletes_by_partition_(std::move(eq_deletes_by_partition)),
+ pos_deletes_by_partition_(std::move(pos_deletes_by_partition)),
+ pos_deletes_by_path_(std::move(pos_deletes_by_path)),
+ dv_by_path_(std::move(dv_by_path)) {
+ has_eq_deletes_ = (global_deletes_ && !global_deletes_->empty()) ||
+ (eq_deletes_by_partition_ &&
!eq_deletes_by_partition_->empty());
+ has_pos_deletes_ = (pos_deletes_by_partition_ &&
!pos_deletes_by_partition_->empty()) ||
+ (pos_deletes_by_path_ && !pos_deletes_by_path_->empty())
||
+ (dv_by_path_ && !dv_by_path_->empty());
+ is_empty_ = !has_eq_deletes_ && !has_pos_deletes_;
+}
+
+DeleteFileIndex::~DeleteFileIndex() = default;
+DeleteFileIndex::DeleteFileIndex(DeleteFileIndex&&) noexcept = default;
+DeleteFileIndex& DeleteFileIndex::operator=(DeleteFileIndex&&) noexcept =
default;
+
+bool DeleteFileIndex::empty() const { return is_empty_; }
+
+bool DeleteFileIndex::has_equality_deletes() const { return has_eq_deletes_; }
+
+bool DeleteFileIndex::has_position_deletes() const { return has_pos_deletes_; }
+
+std::vector<std::shared_ptr<DataFile>>
DeleteFileIndex::ReferencedDeleteFiles() const {
+ std::vector<std::shared_ptr<DataFile>> result;
+
+ if (global_deletes_) {
+ auto files = global_deletes_->ReferencedDeleteFiles();
+ std::ranges::move(files, std::back_inserter(result));
+ }
+
+ if (eq_deletes_by_partition_) {
+ for (const auto& [_, deletes] : *eq_deletes_by_partition_) {
+ auto files = deletes->ReferencedDeleteFiles();
+ std::ranges::move(files, std::back_inserter(result));
+ }
+ }
+
+ if (pos_deletes_by_partition_) {
+ for (const auto& [_, deletes] : *pos_deletes_by_partition_) {
+ auto files = deletes->ReferencedDeleteFiles();
+ std::ranges::move(files, std::back_inserter(result));
+ }
+ }
+
+ if (pos_deletes_by_path_) {
+ for (auto& [_, deletes] : *pos_deletes_by_path_) {
+ auto files = deletes->ReferencedDeleteFiles();
+ std::ranges::move(files, std::back_inserter(result));
+ }
+ }
+
+ if (dv_by_path_) {
+ for (const auto& [_, dv] : *dv_by_path_) {
+ result.push_back(dv.data_file);
+ }
+ }
+
+ return result;
+}
+
+Result<std::vector<std::shared_ptr<DataFile>>> DeleteFileIndex::ForEntry(
+ const ManifestEntry& entry) const {
+ ICEBERG_PRECHECK(entry.data_file != nullptr, "Manifest entry has null data
file");
+ ICEBERG_PRECHECK(entry.sequence_number.has_value(),
+ "Missing sequence number for data file: {}",
+ entry.data_file->file_path);
+ return ForDataFile(entry.sequence_number.value(), *entry.data_file);
+}
+
+Result<std::vector<std::shared_ptr<DataFile>>> DeleteFileIndex::ForDataFile(
+ int64_t sequence_number, const DataFile& file) const {
+ if (is_empty_) {
+ return {};
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(auto global, FindGlobalDeletes(sequence_number,
file));
+ ICEBERG_ASSIGN_OR_RAISE(auto eq_partition,
+ FindEqPartitionDeletes(sequence_number, file));
+ ICEBERG_ASSIGN_OR_RAISE(auto dv, FindDV(sequence_number, file));
+
+ if (dv && global.empty() && eq_partition.empty()) {
+ return std::vector<std::shared_ptr<DataFile>>{std::move(dv)};
+ }
+
+ std::vector<std::shared_ptr<DataFile>> result;
+ result.reserve(global.size() + eq_partition.size() + 1);
+
+ std::ranges::move(global, std::back_inserter(result));
+ std::ranges::move(eq_partition, std::back_inserter(result));
+
+ if (dv) {
+ result.push_back(dv);
+ } else {
+ ICEBERG_ASSIGN_OR_RAISE(auto pos_partition,
+ FindPosPartitionDeletes(sequence_number, file));
+ ICEBERG_ASSIGN_OR_RAISE(auto pos_path, FindPathDeletes(sequence_number,
file));
+ std::ranges::move(pos_partition, std::back_inserter(result));
+ std::ranges::move(pos_path, std::back_inserter(result));
+ }
+
+ return result;
+}
+
+Result<std::vector<std::shared_ptr<DataFile>>>
DeleteFileIndex::FindGlobalDeletes(
+ int64_t seq, const DataFile& data_file) const {
+ if (!global_deletes_) {
+ return {};
+ }
+ return global_deletes_->Filter(seq, data_file);
+}
+
+Result<std::vector<std::shared_ptr<DataFile>>>
DeleteFileIndex::FindEqPartitionDeletes(
+ int64_t seq, const DataFile& data_file) const {
+ if (!eq_deletes_by_partition_) {
+ return {};
+ }
+
+ auto deletes =
+ eq_deletes_by_partition_->get(data_file.partition_spec_id,
data_file.partition);
+ if (!deletes.has_value()) {
+ return {};
+ }
+ return deletes->get()->Filter(seq, data_file);
+}
+
+Result<std::vector<std::shared_ptr<DataFile>>>
DeleteFileIndex::FindPosPartitionDeletes(
+ int64_t seq, const DataFile& data_file) const {
+ if (!pos_deletes_by_partition_) {
+ return {};
+ }
+
+ auto deletes =
+ pos_deletes_by_partition_->get(data_file.partition_spec_id,
data_file.partition);
+ if (!deletes.has_value()) {
+ return {};
+ }
+
+ return deletes->get()->Filter(seq);
+}
+
+Result<std::vector<std::shared_ptr<DataFile>>>
DeleteFileIndex::FindPathDeletes(
+ int64_t seq, const DataFile& data_file) const {
+ if (!pos_deletes_by_path_) {
+ return {};
+ }
+
+ auto it = pos_deletes_by_path_->find(data_file.file_path);
+ if (it == pos_deletes_by_path_->end()) {
+ return {};
+ }
+
+ return it->second->Filter(seq);
+}
+
+Result<std::shared_ptr<DataFile>> DeleteFileIndex::FindDV(
+ int64_t seq, const DataFile& data_file) const {
+ if (!dv_by_path_) {
+ return nullptr;
+ }
+
+ auto it = dv_by_path_->find(data_file.file_path);
+ if (it == dv_by_path_->end()) {
+ return nullptr;
+ }
+
+ ICEBERG_CHECK(it->second.sequence_number.value() >= seq,
+ "DV data sequence number {} must be greater than or equal to
data file "
+ "sequence number {}",
+ it->second.sequence_number.value(), seq);
+
+ return it->second.data_file;
+}
+
+Result<DeleteFileIndex::Builder> DeleteFileIndex::BuilderFor(
+ std::shared_ptr<FileIO> io, std::vector<ManifestFile> delete_manifests) {
+ ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
+ return Builder(std::move(io), std::move(delete_manifests));
+}
+
+// Builder implementation
+
+DeleteFileIndex::Builder::Builder(std::shared_ptr<FileIO> io,
+ std::vector<ManifestFile> delete_manifests)
+ : io_(std::move(io)), delete_manifests_(std::move(delete_manifests)) {}
+
+DeleteFileIndex::Builder::~Builder() = default;
+DeleteFileIndex::Builder::Builder(Builder&&) noexcept = default;
+DeleteFileIndex::Builder& DeleteFileIndex::Builder::operator=(Builder&&)
noexcept =
+ default;
+
+DeleteFileIndex::Builder& DeleteFileIndex::Builder::SpecsById(
+ std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id) {
+ specs_by_id_ = std::move(specs_by_id);
+ return *this;
+}
+
+DeleteFileIndex::Builder& DeleteFileIndex::Builder::WithSchema(
+ std::shared_ptr<Schema> schema) {
+ schema_ = std::move(schema);
+ return *this;
+}
+
+DeleteFileIndex::Builder&
DeleteFileIndex::Builder::AfterSequenceNumber(int64_t seq) {
+ min_sequence_number_ = seq;
+ return *this;
+}
+
+DeleteFileIndex::Builder& DeleteFileIndex::Builder::DataFilter(
+ std::shared_ptr<Expression> filter) {
+ if (data_filter_) {
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(data_filter_,
+ And::Make(data_filter_,
std::move(filter)));
+ } else {
+ data_filter_ = std::move(filter);
+ }
+ return *this;
+}
+
+DeleteFileIndex::Builder& DeleteFileIndex::Builder::PartitionFilter(
+ std::shared_ptr<Expression> filter) {
+ if (partition_filter_) {
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(partition_filter_,
+ And::Make(partition_filter_,
std::move(filter)));
+ } else {
+ partition_filter_ = std::move(filter);
+ }
+ return *this;
+}
+
+DeleteFileIndex::Builder& DeleteFileIndex::Builder::FilterPartitions(
+ std::shared_ptr<PartitionSet> partition_set) {
+ partition_set_ = std::move(partition_set);
+ return *this;
+}
+
+DeleteFileIndex::Builder& DeleteFileIndex::Builder::CaseSensitive(bool
case_sensitive) {
+ case_sensitive_ = case_sensitive;
+ return *this;
+}
+
+DeleteFileIndex::Builder& DeleteFileIndex::Builder::IgnoreResiduals() {
+ ignore_residuals_ = true;
+ return *this;
+}
+
+Result<std::vector<ManifestEntry>> DeleteFileIndex::Builder::LoadDeleteFiles()
{
+ // Build expression caches per spec ID
+ std::unordered_map<int32_t, std::shared_ptr<Expression>> part_expr_cache;
+ std::unordered_map<int32_t, std::unique_ptr<ManifestEvaluator>> eval_cache;
+
+ auto data_filter = ignore_residuals_ ? True::Instance() : data_filter_;
+
+ // Filter and read manifests into manifest entries
+ std::vector<ManifestEntry> files;
+ for (const auto& manifest : delete_manifests_) {
+ if (manifest.content != ManifestContent::kDeletes) {
+ continue;
+ }
+ if (!manifest.has_added_files() && !manifest.has_existing_files()) {
+ continue;
+ }
+
+ const int32_t spec_id = manifest.partition_spec_id;
+ auto spec_iter = specs_by_id_.find(spec_id);
+ ICEBERG_CHECK(spec_iter != specs_by_id_.cend(),
+ "Partition spec ID {} not found when loading delete files",
spec_id);
+
+ const auto& spec = spec_iter->second;
+
+ // Get or compute projected partition expression
+ if (!part_expr_cache.contains(spec_id) && data_filter_) {
+ auto projector = Projections::Inclusive(*spec, *schema_,
case_sensitive_);
+ ICEBERG_ASSIGN_OR_RAISE(auto projected,
projector->Project(data_filter_));
+ part_expr_cache[spec_id] = std::move(projected);
+ }
+
+ // Get or create manifest evaluator
+ if (!eval_cache.contains(spec_id)) {
+ auto filter = partition_filter_;
+ if (auto it = part_expr_cache.find(spec_id); it !=
part_expr_cache.cend()) {
+ if (filter) {
+ ICEBERG_ASSIGN_OR_RAISE(filter, And::Make(filter, it->second));
+ } else {
+ filter = it->second;
+ }
+ }
+ if (filter) {
+ ICEBERG_ASSIGN_OR_RAISE(auto evaluator,
+ ManifestEvaluator::MakePartitionFilter(
+ std::move(filter), spec, *schema_,
case_sensitive_));
+ eval_cache[spec_id] = std::move(evaluator);
+ }
+ }
+
+ // Evaluate manifest against filter
+ if (auto it = eval_cache.find(spec_id); it != eval_cache.end()) {
+ ICEBERG_ASSIGN_OR_RAISE(auto should_match,
it->second->Evaluate(manifest));
+ if (!should_match) {
+ continue; // Manifest doesn't match filter
+ }
+ }
+
+ // Read manifest entries
+ ICEBERG_ASSIGN_OR_RAISE(auto reader,
+ ManifestReader::Make(manifest, io_, schema_,
spec));
+
+ auto partition_filter = partition_filter_;
+ if (auto it = part_expr_cache.find(spec_id); it != part_expr_cache.cend())
{
+ if (partition_filter) {
+ ICEBERG_ASSIGN_OR_RAISE(partition_filter,
+ And::Make(partition_filter, it->second));
+ } else {
+ partition_filter = it->second;
+ }
+ }
+ if (partition_filter) {
+ reader->FilterPartitions(std::move(partition_filter));
+ }
+ if (partition_set_) {
+ reader->FilterPartitions(partition_set_);
+ }
+
reader->FilterRows(data_filter).CaseSensitive(case_sensitive_).TryDropStats();
+
+ ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->LiveEntries());
+ files.reserve(files.size() + entries.size());
+
+ for (auto& entry : entries) {
+ ICEBERG_CHECK(entry.data_file != nullptr, "ManifestEntry must have a
data file");
+ ICEBERG_CHECK(entry.sequence_number.has_value(),
+ "Missing sequence number for delete file: {}",
+ entry.data_file->file_path);
+ if (entry.sequence_number.value() > min_sequence_number_) {
+ auto& file = *entry.data_file;
+ // keep minimum stats to avoid memory pressure
+ std::unordered_set<int32_t> columns =
+ file.content == DataFile::Content::kPositionDeletes
+ ?
std::unordered_set<int32_t>{MetadataColumns::kDeleteFilePathColumnId}
+ : std::unordered_set<int32_t>(file.equality_ids.begin(),
+ file.equality_ids.end());
+ ContentFileUtil::DropUnselectedStats(*entry.data_file, columns);
+ files.emplace_back(std::move(entry));
+ }
+ }
+ }
+
+ return files;
+}
+
+Status DeleteFileIndex::Builder::AddDV(
+ std::unordered_map<std::string, ManifestEntry>& dv_by_path,
ManifestEntry&& entry) {
+ ICEBERG_PRECHECK(entry.data_file != nullptr, "ManifestEntry must have a data
file");
+ ICEBERG_PRECHECK(entry.sequence_number.has_value(), "Missing sequence number
for DV {}",
+ entry.data_file->file_path);
+
+ const auto& path = entry.data_file->referenced_data_file;
+ ICEBERG_PRECHECK(path.has_value(), "DV must have a referenced data file");
+
+ std::string referenced_path = path.value();
+ auto [it, inserted] = dv_by_path.emplace(referenced_path, std::move(entry));
+ if (!inserted) {
+ return ValidationFailed("Can't index multiple DVs for {}",
referenced_path);
+ }
+ return {};
+}
+
+Status DeleteFileIndex::Builder::AddPositionDelete(
+ std::unordered_map<std::string,
std::unique_ptr<internal::PositionDeletes>>&
+ deletes_by_path,
+ PartitionMap<std::unique_ptr<internal::PositionDeletes>>&
deletes_by_partition,
+ ManifestEntry&& entry) {
+ ICEBERG_PRECHECK(entry.data_file != nullptr, "ManifestEntry must have a data
file");
+ ICEBERG_PRECHECK(entry.sequence_number.has_value(),
+ "Missing sequence number for position delete {}",
+ entry.data_file->file_path);
+
+ ICEBERG_ASSIGN_OR_RAISE(auto referenced_path,
+
ContentFileUtil::ReferencedDataFile(*entry.data_file));
+
+ if (referenced_path.has_value()) {
+ // File-scoped position delete
+ auto& deletes = deletes_by_path[referenced_path.value()];
+ if (!deletes) {
+ deletes = std::make_unique<internal::PositionDeletes>();
+ }
+ ICEBERG_RETURN_UNEXPECTED(deletes->Add(std::move(entry)));
+ } else {
+ // Partition-scoped position delete
+ int32_t spec_id = entry.data_file->partition_spec_id;
+ const auto& partition = entry.data_file->partition;
+
+ auto existing = deletes_by_partition.get(spec_id, partition);
+ if (existing.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(existing->get()->Add(std::move(entry)));
+ } else {
+ auto deletes = std::make_unique<internal::PositionDeletes>();
+ ICEBERG_RETURN_UNEXPECTED(deletes->Add(std::move(entry)));
+ deletes_by_partition.put(spec_id, partition, std::move(deletes));
+ }
+ }
+
+ return {};
+}
+
+Status DeleteFileIndex::Builder::AddEqualityDelete(
+ internal::EqualityDeletes& global_deletes,
+ PartitionMap<std::unique_ptr<internal::EqualityDeletes>>&
deletes_by_partition,
+ ManifestEntry&& entry) {
+ ICEBERG_PRECHECK(entry.data_file != nullptr, "ManifestEntry must have a data
file");
+ ICEBERG_PRECHECK(entry.sequence_number.has_value(),
+ "Missing sequence number for equality delete {}",
+ entry.data_file->file_path);
+
+ int32_t spec_id = entry.data_file->partition_spec_id;
+
+ auto spec_it = specs_by_id_.find(spec_id);
+ if (spec_it == specs_by_id_.end()) {
+ return InvalidArgument("Unknown partition spec ID: {}", spec_id);
+ }
+ const auto& spec = spec_it->second;
+
+ if (spec->fields().empty()) {
+ // Global equality delete for unpartitioned tables
+ ICEBERG_RETURN_UNEXPECTED(global_deletes.Add(std::move(entry)));
+ } else {
+ // Partition-scoped equality delete
+ const auto& partition = entry.data_file->partition;
+
+ auto existing = deletes_by_partition.get(spec_id, partition);
+ if (existing.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(existing->get()->Add(std::move(entry)));
+ } else {
+ auto deletes = std::make_unique<internal::EqualityDeletes>(*schema_);
+ ICEBERG_RETURN_UNEXPECTED(deletes->Add(std::move(entry)));
+ deletes_by_partition.put(spec_id, partition, std::move(deletes));
+ }
+ }
+
+ return {};
+}
+
+Result<std::unique_ptr<DeleteFileIndex>> DeleteFileIndex::Builder::Build() {
+ ICEBERG_RETURN_UNEXPECTED(CheckErrors());
+ ICEBERG_PRECHECK(io_ != nullptr, "FileIO is required to load delete files");
+ ICEBERG_PRECHECK(schema_ != nullptr, "Schema is required to load delete
files");
+ ICEBERG_PRECHECK(!specs_by_id_.empty(),
+ "Partition specs are required to load delete files");
+
+ std::vector<ManifestEntry> entries;
+ if (!delete_manifests_.empty()) {
+ ICEBERG_ASSIGN_OR_RAISE(entries, LoadDeleteFiles());
+ }
+
+ // Build index structures
+ auto global_deletes = std::make_unique<internal::EqualityDeletes>(*schema_);
+ auto eq_deletes_by_partition =
+
std::make_unique<PartitionMap<std::unique_ptr<internal::EqualityDeletes>>>();
+ auto pos_deletes_by_partition =
+
std::make_unique<PartitionMap<std::unique_ptr<internal::PositionDeletes>>>();
+ auto pos_deletes_by_path = std::make_unique<
+ std::unordered_map<std::string,
std::unique_ptr<internal::PositionDeletes>>>();
+ auto dv_by_path = std::make_unique<std::unordered_map<std::string,
ManifestEntry>>();
+
+ for (auto& entry : entries) {
+ ICEBERG_CHECK(entry.data_file != nullptr, "ManifestEntry must have a data
file");
+
+ switch (entry.data_file->content) {
+ case DataFile::Content::kPositionDeletes:
+ if (ContentFileUtil::IsDV(*entry.data_file)) {
+ ICEBERG_RETURN_UNEXPECTED(AddDV(*dv_by_path, std::move(entry)));
+ } else {
+ ICEBERG_RETURN_UNEXPECTED(AddPositionDelete(
+ *pos_deletes_by_path, *pos_deletes_by_partition,
std::move(entry)));
+ }
+ break;
+
+ case DataFile::Content::kEqualityDeletes:
+ ICEBERG_RETURN_UNEXPECTED(AddEqualityDelete(
+ *global_deletes, *eq_deletes_by_partition, std::move(entry)));
+ break;
+
+ default:
+ return NotSupported("Unsupported content type: {}",
+ static_cast<int>(entry.data_file->content));
+ }
+ }
+
+ return std::unique_ptr<DeleteFileIndex>(new DeleteFileIndex(
+ global_deletes->empty() ? nullptr : std::move(global_deletes),
+ eq_deletes_by_partition->empty() ? nullptr :
std::move(eq_deletes_by_partition),
+ pos_deletes_by_partition->empty() ? nullptr :
std::move(pos_deletes_by_partition),
+ pos_deletes_by_path->empty() ? nullptr : std::move(pos_deletes_by_path),
+ dv_by_path->empty() ? nullptr : std::move(dv_by_path)));
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/delete_file_index.h b/src/iceberg/delete_file_index.h
new file mode 100644
index 00000000..03cc2661
--- /dev/null
+++ b/src/iceberg/delete_file_index.h
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/delete_file_index.h
+/// An index of delete files by sequence number.
+
+#include <functional>
+#include <map>
+#include <memory>
+#include <optional>
+#include <unordered_map>
+#include <vector>
+
+#include "iceberg/expression/literal.h"
+#include "iceberg/iceberg_export.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+#include "iceberg/util/error_collector.h"
+#include "iceberg/util/partition_value_util.h"
+
+namespace iceberg {
+
+namespace internal {
+
+/// \brief Wrapper for equality delete files that caches converted bounds.
+struct ICEBERG_EXPORT EqualityDeleteFile {
+ const Schema* schema;
+ ManifestEntry wrapped;
+ int64_t apply_sequence_number; // = data_sequence_number - 1
+
+ // Lazily converted bounds for pruning
+ mutable std::unordered_map<int32_t, Literal> lower_bounds;
+ mutable std::unordered_map<int32_t, Literal> upper_bounds;
+ mutable bool bounds_converted = false;
+
+ EqualityDeleteFile(const Schema* schema, ManifestEntry&& entry)
+ : schema(schema),
+ wrapped(std::move(entry)),
+ apply_sequence_number(wrapped.sequence_number.value() - 1) {}
+
+ /// \brief Check if this delete file has both lower and upper bounds.
+ bool HasLowerAndUpperBounds() const {
+ return !wrapped.data_file->lower_bounds.empty() &&
+ !wrapped.data_file->upper_bounds.empty();
+ }
+
+ /// \brief Get the lower bound for a field ID.
+ Result<std::optional<std::reference_wrapper<const Literal>>> LowerBound(
+ int32_t id) const {
+ ICEBERG_RETURN_UNEXPECTED(ConvertBoundsIfNeeded());
+ auto it = lower_bounds.find(id);
+ return it != lower_bounds.cend() ?
std::make_optional(std::cref(it->second))
+ : std::nullopt;
+ }
+
+ /// \brief Get the upper bound for a field ID.
+ Result<std::optional<std::reference_wrapper<const Literal>>> UpperBound(
+ int32_t id) const {
+ ICEBERG_RETURN_UNEXPECTED(ConvertBoundsIfNeeded());
+ auto it = upper_bounds.find(id);
+ return it != upper_bounds.cend() ?
std::make_optional(std::cref(it->second))
+ : std::nullopt;
+ }
+
+ private:
+ /// \brief Convert bounds from binary to Literal. Implemented in .cc file.
+ Status ConvertBoundsIfNeeded() const;
+};
+
+/// \brief Check if two ranges overlap.
+inline bool RangesOverlap(const Literal& data_lower, const Literal& data_upper,
+ const Literal& delete_lower, const Literal&
delete_upper) {
+ if (data_lower > delete_upper) {
+ return false;
+ }
+ if (delete_lower > data_upper) {
+ return false;
+ }
+ return true;
+}
+
+/// \brief Check if a value count map indicates all values are null.
+inline bool AllNull(const std::map<int32_t, int64_t>& null_counts,
+ const std::map<int32_t, int64_t>& value_counts, int32_t
field_id,
+ bool is_required) {
+ if (is_required) {
+ return false;
+ }
+
+ auto null_it = null_counts.find(field_id);
+ auto value_it = value_counts.find(field_id);
+ if (null_it == null_counts.cend() || value_it == value_counts.cend()) {
+ return false;
+ }
+
+ return null_it->second == value_it->second;
+}
+
+/// \brief Check if all values are non-null.
+inline bool AllNonNull(const std::map<int32_t, int64_t>& null_counts, int32_t
field_id,
+ bool is_required) {
+ if (is_required) {
+ return true;
+ }
+
+ auto it = null_counts.find(field_id);
+ if (it == null_counts.cend()) {
+ return false;
+ }
+
+ return it->second <= 0;
+}
+
+/// \brief Check if the column contains any null values.
+inline bool ContainsNull(const std::map<int32_t, int64_t>& null_counts,
int32_t field_id,
+ bool is_required) {
+ if (is_required) {
+ return false;
+ }
+
+ auto it = null_counts.find(field_id);
+ if (it == null_counts.cend()) {
+ return true; // Unknown, assume may contain null
+ }
+
+ return it->second > 0;
+}
+
+/// \brief Check if an equality delete file can contain deletes for a data
file.
+ICEBERG_EXPORT Result<bool> CanContainEqDeletesForFile(
+ const DataFile& data_file, const EqualityDeleteFile& delete_file);
+
+/// \brief A group of position delete files sorted by the sequence number they
apply to.
+///
+/// Position delete files apply to data files with a sequence number <= the
delete
+/// file's data sequence number.
+class ICEBERG_EXPORT PositionDeletes {
+ public:
+ PositionDeletes() = default;
+
+ /// \brief Add a position delete file to this group.
+ [[nodiscard]] Status Add(ManifestEntry&& entry);
+
+ /// \brief Returns all delete files with data_sequence_number >= the given
sequence
+ /// number.
+ std::vector<std::shared_ptr<DataFile>> Filter(int64_t seq);
+
+ /// \brief Get all delete files in this group.
+ std::vector<std::shared_ptr<DataFile>> ReferencedDeleteFiles();
+
+ /// \brief Check if this group is empty.
+ bool empty() const { return files_.empty(); }
+
+ private:
+ void IndexIfNeeded();
+
+ std::vector<ManifestEntry> files_;
+ std::vector<int64_t> seqs_;
+ bool indexed_ = false;
+};
+
+/// \brief A group of equality delete files sorted by apply sequence number.
+///
+/// Equality deletes apply to data files with sequence number < the delete's
+/// data sequence number (i.e., apply_sequence_number = data_sequence_number -
1).
+class ICEBERG_EXPORT EqualityDeletes {
+ public:
+ explicit EqualityDeletes(const Schema& schema) : schema_(schema) {}
+
+ /// \brief Add an equality delete file to this group.
+ [[nodiscard]] Status Add(ManifestEntry&& entry);
+
+ /// \brief Filter equality deletes that apply to the given data file.
+ ///
+ /// Returns delete files where:
+ /// 1. apply_sequence_number >= the data file's sequence number
+ /// 2. The delete file's bounds may overlap with the data file
+ Result<std::vector<std::shared_ptr<DataFile>>> Filter(int64_t seq,
+ const DataFile&
data_file);
+
+ /// \brief Get all delete files in this group.
+ std::vector<std::shared_ptr<DataFile>> ReferencedDeleteFiles();
+
+ /// \brief Check if this group is empty.
+ bool empty() const { return files_.empty(); }
+
+ private:
+ void IndexIfNeeded();
+
+ const Schema& schema_;
+ std::vector<EqualityDeleteFile> files_;
+ std::vector<int64_t> seqs_;
+ bool indexed_ = false;
+};
+
+} // namespace internal
+
+/// \brief An index of delete files by sequence number.
+///
+/// Use `DeleteFileIndex::Builder` to construct an index, and `ForDataFile()`
+/// or `ForEntry()` to get the delete files to apply to a given data file.
+///
+/// The index organizes delete files by:
+/// - Global equality deletes (apply to all partitions)
+/// - Partitioned equality deletes (apply to specific partitions)
+/// - Partitioned position deletes (apply to specific partitions)
+/// - File-scoped position deletes (apply to specific data files)
+/// - Deletion vectors (DVs) that reference specific data files
+class ICEBERG_EXPORT DeleteFileIndex {
+ public:
+ class Builder;
+
+ ~DeleteFileIndex();
+
+ DeleteFileIndex(DeleteFileIndex&&) noexcept;
+ DeleteFileIndex& operator=(DeleteFileIndex&&) noexcept;
+ DeleteFileIndex(const DeleteFileIndex&) = delete;
+ DeleteFileIndex& operator=(const DeleteFileIndex&) = delete;
+
+ /// \brief Check if this index is empty (has no delete files).
+ bool empty() const;
+
+ /// \brief Check if this index has any equality delete files.
+ bool has_equality_deletes() const;
+
+ /// \brief Check if this index has any position delete files.
+ bool has_position_deletes() const;
+
+ /// \brief Get all delete files referenced by this index.
+ /// TODO(gangwu): use lazy iterator to avoid large memory allocation.
+ std::vector<std::shared_ptr<DataFile>> ReferencedDeleteFiles() const;
+
+ /// \brief Get the delete files that apply to a manifest entry.
+ ///
+ /// \param entry The manifest entry to find delete files for
+ /// \return Delete files that should be applied when reading the data file
+ Result<std::vector<std::shared_ptr<DataFile>>> ForEntry(
+ const ManifestEntry& entry) const;
+
+ /// \brief Get the delete files that apply to a data file with a specific
sequence
+ /// number.
+ ///
+ /// \param sequence_number The data sequence number of the data file
+ /// \param file The data file to find delete files for
+ /// \return Delete files that should be applied when reading the data file
+ Result<std::vector<std::shared_ptr<DataFile>>> ForDataFile(int64_t
sequence_number,
+ const DataFile&
file) const;
+
+ /// \brief Create a builder for constructing a DeleteFileIndex from manifest
files.
+ ///
+ /// \param io The FileIO to use for reading manifests
+ /// \param delete_manifests The delete manifests to index
+ /// \return A Builder instance
+ static Result<Builder> BuilderFor(std::shared_ptr<FileIO> io,
+ std::vector<ManifestFile>
delete_manifests);
+
+ private:
+ friend class Builder;
+
+ // Private constructor used by Builder
+ DeleteFileIndex(
+ std::unique_ptr<internal::EqualityDeletes> global_deletes,
+ std::unique_ptr<PartitionMap<std::unique_ptr<internal::EqualityDeletes>>>
+ eq_deletes_by_partition,
+ std::unique_ptr<PartitionMap<std::unique_ptr<internal::PositionDeletes>>>
+ pos_deletes_by_partition,
+ std::unique_ptr<
+ std::unordered_map<std::string,
std::unique_ptr<internal::PositionDeletes>>>
+ pos_deletes_by_path,
+ std::unique_ptr<std::unordered_map<std::string, ManifestEntry>>
dv_by_path);
+
+ // Helper methods for finding delete files
+ Result<std::vector<std::shared_ptr<DataFile>>> FindGlobalDeletes(
+ int64_t seq, const DataFile& data_file) const;
+ Result<std::vector<std::shared_ptr<DataFile>>> FindEqPartitionDeletes(
+ int64_t seq, const DataFile& data_file) const;
+ Result<std::vector<std::shared_ptr<DataFile>>> FindPosPartitionDeletes(
+ int64_t seq, const DataFile& data_file) const;
+ Result<std::vector<std::shared_ptr<DataFile>>> FindPathDeletes(
+ int64_t seq, const DataFile& data_file) const;
+ Result<std::shared_ptr<DataFile>> FindDV(int64_t seq, const DataFile&
data_file) const;
+
+ // Index data structures
+ std::unique_ptr<internal::EqualityDeletes> global_deletes_;
+ std::unique_ptr<PartitionMap<std::unique_ptr<internal::EqualityDeletes>>>
+ eq_deletes_by_partition_;
+ std::unique_ptr<PartitionMap<std::unique_ptr<internal::PositionDeletes>>>
+ pos_deletes_by_partition_;
+ std::unique_ptr<
+ std::unordered_map<std::string,
std::unique_ptr<internal::PositionDeletes>>>
+ pos_deletes_by_path_;
+ std::unique_ptr<std::unordered_map<std::string, ManifestEntry>> dv_by_path_;
+
+ bool has_eq_deletes_ = false;
+ bool has_pos_deletes_ = false;
+ bool is_empty_ = true;
+};
+
+class ICEBERG_EXPORT DeleteFileIndex::Builder : public ErrorCollector {
+ public:
+ /// \brief Construct a builder from manifest files.
+ Builder(std::shared_ptr<FileIO> io, std::vector<ManifestFile>
delete_manifests);
+
+ ~Builder() override;
+
+ Builder(Builder&&) noexcept;
+ Builder& operator=(Builder&&) noexcept;
+ Builder(const Builder&) = delete;
+ Builder& operator=(const Builder&) = delete;
+
+ /// \brief Set the partition specs by ID.
+ Builder& SpecsById(
+ std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id);
+
+ /// \brief Set the table schema.
+ ///
+ /// Required for filtering and expression evaluation.
+ Builder& WithSchema(std::shared_ptr<Schema> schema);
+
+ /// \brief Set the minimum sequence number for delete files.
+ ///
+ /// Only delete files with sequence number > min_sequence_number will be
included.
+ Builder& AfterSequenceNumber(int64_t seq);
+
+ /// \brief Set a row-level data filter.
+ ///
+ /// This filter is projected to partition expressions for manifest pruning
and
+ /// then residuals are applied to data files.
+ Builder& DataFilter(std::shared_ptr<Expression> filter);
+
+ /// \brief Set a partition filter expression.
+ Builder& PartitionFilter(std::shared_ptr<Expression> filter);
+
+ /// \brief Set a partition set to filter manifests.
+ Builder& FilterPartitions(std::shared_ptr<PartitionSet> partition_set);
+
+ /// \brief Set case sensitivity for column name matching.
+ Builder& CaseSensitive(bool case_sensitive);
+
+ /// \brief Ignore residual expressions after partition filtering.
+ Builder& IgnoreResiduals();
+
+ /// \brief Build the DeleteFileIndex.
+ Result<std::unique_ptr<DeleteFileIndex>> Build();
+
+ private:
+ // Load delete files from manifests
+ Result<std::vector<ManifestEntry>> LoadDeleteFiles();
+
+ // Add a DV to the index
+ Status AddDV(std::unordered_map<std::string, ManifestEntry>& dv_by_path,
+ ManifestEntry&& entry);
+
+ // Add a position delete file to the index
+ Status AddPositionDelete(
+ std::unordered_map<std::string,
std::unique_ptr<internal::PositionDeletes>>&
+ deletes_by_path,
+ PartitionMap<std::unique_ptr<internal::PositionDeletes>>&
deletes_by_partition,
+ ManifestEntry&& entry);
+
+ // Add an equality delete file to the index
+ Status AddEqualityDelete(
+ internal::EqualityDeletes& global_deletes,
+ PartitionMap<std::unique_ptr<internal::EqualityDeletes>>&
deletes_by_partition,
+ ManifestEntry&& entry);
+
+ std::shared_ptr<FileIO> io_;
+ std::vector<ManifestFile> delete_manifests_;
+ int64_t min_sequence_number_ = 0;
+ std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id_;
+ std::shared_ptr<Schema> schema_;
+ std::shared_ptr<Expression> data_filter_;
+ std::shared_ptr<Expression> partition_filter_;
+ std::shared_ptr<PartitionSet> partition_set_;
+ bool case_sensitive_ = true;
+ bool ignore_residuals_ = false;
+};
+
+} // namespace iceberg
diff --git a/src/iceberg/manifest/manifest_reader.cc
b/src/iceberg/manifest/manifest_reader.cc
index ba1ff860..5f50d045 100644
--- a/src/iceberg/manifest/manifest_reader.cc
+++ b/src/iceberg/manifest/manifest_reader.cc
@@ -39,6 +39,7 @@
#include "iceberg/schema_field.h"
#include "iceberg/type.h"
#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/content_file_util.h"
#include "iceberg/util/macros.h"
namespace iceberg {
@@ -557,9 +558,9 @@ Result<std::vector<ManifestEntry>> ParseManifestEntry(
return manifest_entries;
}
-const std::vector<std::string> kStatsColumns = {"value_counts",
"null_value_counts",
- "nan_value_counts",
"lower_bounds",
- "upper_bounds",
"record_count"};
+const std::vector<std::string> kStatsColumns = {
+ "value_counts", "null_value_counts", "nan_value_counts", "lower_bounds",
+ "upper_bounds", "column_sizes", "record_count"};
bool RequireStatsProjection(const std::shared_ptr<Expression>& row_filter,
const std::vector<std::string>& columns) {
@@ -592,6 +593,29 @@ Result<std::shared_ptr<Schema>>
ProjectSchema(std::shared_ptr<Schema> schema,
} // namespace
+bool ManifestReader::ShouldDropStats(const std::vector<std::string>& columns) {
+ // Make sure we only drop all stats if we had projected all stats.
+ // We do not drop stats even if we had partially added some stats columns,
except for
+ // record_count column.
+ // Since we don't want to keep stats map which could be huge in size just
because we
+ // select record_count, which is a primitive type.
+ if (!columns.empty()) {
+ const std::unordered_set<std::string_view> selected(columns.cbegin(),
columns.cend());
+ if (selected.contains(Schema::kAllColumns)) {
+ return false;
+ }
+ std::unordered_set<std::string_view> intersection;
+ for (const auto& col : kStatsColumns) {
+ if (selected.contains(col)) {
+ intersection.insert(col);
+ }
+ }
+ return intersection.empty() ||
+ (intersection.size() == 1 && intersection.contains("record_count"));
+ }
+ return false;
+}
+
std::vector<std::string> ManifestReader::WithStatsColumns(
const std::vector<std::string>& columns) {
if (std::ranges::contains(columns, Schema::kAllColumns)) {
@@ -645,6 +669,11 @@ ManifestReader& ManifestReaderImpl::CaseSensitive(bool
case_sensitive) {
return *this;
}
+ManifestReader& ManifestReaderImpl::TryDropStats() {
+ drop_stats_ = true;
+ return *this;
+}
+
bool ManifestReaderImpl::HasPartitionFilter() const {
ICEBERG_DCHECK(part_filter_, "Partition filter is not set");
return part_filter_->op() != Expression::Operation::kTrue;
@@ -773,6 +802,8 @@ Result<std::vector<ManifestEntry>>
ManifestReaderImpl::ReadEntries(bool only_liv
ICEBERG_ASSIGN_OR_RAISE(metrics_evaluator, GetMetricsEvaluator());
}
+ bool drop_stats = drop_stats_ && ShouldDropStats(columns_);
+
while (true) {
ICEBERG_ASSIGN_OR_RAISE(auto result, file_reader_->Next());
if (!result.has_value()) {
@@ -812,6 +843,10 @@ Result<std::vector<ManifestEntry>>
ManifestReaderImpl::ReadEntries(bool only_liv
}
}
+ if (drop_stats) {
+ ContentFileUtil::DropAllStats(*entry.data_file);
+ }
+
manifest_entries.push_back(std::move(entry));
}
}
diff --git a/src/iceberg/manifest/manifest_reader.h
b/src/iceberg/manifest/manifest_reader.h
index 748d0e33..ddfefc57 100644
--- a/src/iceberg/manifest/manifest_reader.h
+++ b/src/iceberg/manifest/manifest_reader.h
@@ -71,6 +71,15 @@ class ICEBERG_EXPORT ManifestReader {
/// \brief Set case sensitivity for column name matching.
virtual ManifestReader& CaseSensitive(bool case_sensitive) = 0;
+ /// \brief Try to drop stats from returned DataFile objects.
+ virtual ManifestReader& TryDropStats() = 0;
+
+ /// \brief Determine whether stats should be dropped based on selected
columns.
+ ///
+ /// Returns true if the selected columns do not include any stats columns,
or only
+ /// include record_count (which is a primitive, not a large map).
+ static bool ShouldDropStats(const std::vector<std::string>& columns);
+
/// \brief Creates a reader for a manifest file.
/// \param manifest A ManifestFile object containing metadata about the
manifest.
/// \param file_io File IO implementation to use.
diff --git a/src/iceberg/manifest/manifest_reader_internal.h
b/src/iceberg/manifest/manifest_reader_internal.h
index ba804ffb..42263808 100644
--- a/src/iceberg/manifest/manifest_reader_internal.h
+++ b/src/iceberg/manifest/manifest_reader_internal.h
@@ -74,6 +74,8 @@ class ManifestReaderImpl : public ManifestReader {
ManifestReader& CaseSensitive(bool case_sensitive) override;
+ ManifestReader& TryDropStats() override;
+
private:
/// \brief Read entries with optional live-only filtering.
Result<std::vector<ManifestEntry>> ReadEntries(bool only_live);
@@ -111,6 +113,7 @@ class ManifestReaderImpl : public ManifestReader {
std::shared_ptr<Expression> row_filter_{True::Instance()};
std::shared_ptr<PartitionSet> partition_set_;
bool case_sensitive_{true};
+ bool drop_stats_{false};
// Lazy fields
std::unique_ptr<Reader> file_reader_;
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index 8327ca2e..f8022b09 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -42,6 +42,7 @@ iceberg_include_dir = include_directories('..')
iceberg_sources = files(
'arrow_c_data_guard_internal.cc',
'catalog/memory/in_memory_catalog.cc',
+ 'delete_file_index.cc',
'expression/aggregate.cc',
'expression/binder.cc',
'expression/evaluator.cc',
@@ -102,6 +103,7 @@ iceberg_sources = files(
'update/update_properties.cc',
'update/update_sort_order.cc',
'util/bucket_util.cc',
+ 'util/content_file_util.cc',
'util/conversions.cc',
'util/decimal.cc',
'util/gzip_internal.cc',
@@ -165,6 +167,7 @@ install_headers(
'arrow_c_data.h',
'catalog.h',
'constants.h',
+ 'delete_file_index.h',
'exception.h',
'file_format.h',
'file_io.h',
diff --git a/src/iceberg/metadata_columns.h b/src/iceberg/metadata_columns.h
index 18aeb2b8..61f07c48 100644
--- a/src/iceberg/metadata_columns.h
+++ b/src/iceberg/metadata_columns.h
@@ -39,19 +39,22 @@ struct ICEBERG_EXPORT MetadataColumns {
constexpr static int32_t kInt32Max = std::numeric_limits<int32_t>::max();
// IDs kInt32Max - (1-100) are used for metadata columns
- inline static const SchemaField kFilePath =
- SchemaField::MakeRequired(kInt32Max - 1, "_file", iceberg::string(),
- "Path of the file in which a row is stored");
+ constexpr static int32_t kFilePathColumnId = kInt32Max - 1;
+ inline static const SchemaField kFilePath = SchemaField::MakeRequired(
+ kFilePathColumnId, "_file", string(), "Path of the file in which a row
is stored");
+ constexpr static int32_t kFilePositionColumnId = kInt32Max - 2;
inline static const SchemaField kRowPosition =
- SchemaField::MakeRequired(kInt32Max - 2, "_pos", iceberg::int64(),
+ SchemaField::MakeRequired(kFilePositionColumnId, "_pos", int64(),
"Ordinal position of a row in the source data
file");
+ constexpr static int32_t kIsDeletedColumnId = kInt32Max - 3;
inline static const SchemaField kIsDeleted = SchemaField::MakeRequired(
- kInt32Max - 3, "_deleted", iceberg::binary(), "Whether the row has been
deleted");
+ kIsDeletedColumnId, "_deleted", binary(), "Whether the row has been
deleted");
+ constexpr static int32_t kSpecIdColumnId = kInt32Max - 4;
inline static const SchemaField kSpecId =
- SchemaField::MakeRequired(kInt32Max - 4, "_spec_id", iceberg::int32(),
+ SchemaField::MakeRequired(kSpecIdColumnId, "_spec_id", int32(),
"Spec ID used to track the file containing a
row");
// The partition column type depends on all specs in the table
@@ -64,35 +67,41 @@ struct ICEBERG_EXPORT MetadataColumns {
constexpr static int32_t kContentSizeInBytesColumnId = kInt32Max - 7;
// IDs kInt32Max - (101-200) are used for reserved columns
+ constexpr static int32_t kDeleteFilePathColumnId = kInt32Max - 101;
inline static const SchemaField kDeleteFilePath =
- SchemaField::MakeRequired(kInt32Max - 101, "file_path",
iceberg::string(),
+ SchemaField::MakeRequired(kDeleteFilePathColumnId, "file_path", string(),
"Path of a file in which a deleted row is
stored");
+ constexpr static int32_t kDeleteFilePosColumnId = kInt32Max - 102;
inline static const SchemaField kDeleteFilePos =
- SchemaField::MakeRequired(kInt32Max - 102, "pos", iceberg::int64(),
+ SchemaField::MakeRequired(kDeleteFilePosColumnId, "pos", int64(),
"Ordinal position of a deleted row in the data
file");
// The row column type depends on the table schema
- constexpr static int32_t kDeleteFileRowFieldId = kInt32Max - 103;
+ constexpr static int32_t kDeleteFileRowColumnId = kInt32Max - 103;
constexpr static std::string_view kDeleteFileRowFieldName = "row";
constexpr static std::string_view kDeleteFileRowDoc = "Deleted row values";
+ constexpr static int32_t kChangeTypeColumnId = kInt32Max - 104;
inline static const SchemaField kChangeType = SchemaField::MakeRequired(
- kInt32Max - 104, "_change_type", iceberg::string(), "Record type in
changelog");
+ kChangeTypeColumnId, "_change_type", string(), "Record type in
changelog");
- inline static const SchemaField kChangeOrdinal =
- SchemaField::MakeOptional(kInt32Max - 105, "_change_ordinal",
iceberg::int32(),
- "Change ordinal in changelog");
+ constexpr static int32_t kChangeOrdinalColumnId = kInt32Max - 105;
+ inline static const SchemaField kChangeOrdinal = SchemaField::MakeOptional(
+ kChangeOrdinalColumnId, "_change_ordinal", int32(), "Change ordinal in
changelog");
+ constexpr static int32_t kCommitSnapshotIdColumnId = kInt32Max - 106;
inline static const SchemaField kCommitSnapshotId =
SchemaField::MakeOptional(
- kInt32Max - 106, "_commit_snapshot_id", iceberg::int64(), "Commit
snapshot ID");
+ kCommitSnapshotIdColumnId, "_commit_snapshot_id", int64(), "Commit
snapshot ID");
+ constexpr static int32_t kRowIdColumnId = kInt32Max - 107;
inline static const SchemaField kRowId =
- SchemaField::MakeOptional(kInt32Max - 107, "_row_id", iceberg::int64(),
+ SchemaField::MakeOptional(kRowIdColumnId, "_row_id", int64(),
"Implicit row ID that is automatically
assigned");
+ constexpr static int32_t kLastUpdatedSequenceNumberColumnId = kInt32Max -
108;
inline static const SchemaField kLastUpdatedSequenceNumber =
SchemaField::MakeOptional(
- kInt32Max - 108, "_last_updated_sequence_number", iceberg::int64(),
+ kLastUpdatedSequenceNumberColumnId, "_last_updated_sequence_number",
int64(),
"Sequence number when the row was last updated");
/// \brief Get the set of metadata field IDs.
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index 4b2c0f47..b7f23c53 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -120,11 +120,7 @@ if(ICEBERG_BUILD_BUNDLE)
avro_data_test.cc
avro_test.cc
avro_schema_test.cc
- avro_stream_test.cc
- manifest_list_versions_test.cc
- manifest_reader_stats_test.cc
- manifest_reader_test.cc
- manifest_writer_versions_test.cc)
+ avro_stream_test.cc)
add_iceberg_test(arrow_test
USE_BUNDLE
@@ -143,6 +139,15 @@ if(ICEBERG_BUILD_BUNDLE)
eval_expr_test.cc
evaluator_test.cc)
+ add_iceberg_test(manifest_test
+ USE_BUNDLE
+ SOURCES
+ delete_file_index_test.cc
+ manifest_list_versions_test.cc
+ manifest_reader_stats_test.cc
+ manifest_reader_test.cc
+ manifest_writer_versions_test.cc)
+
add_iceberg_test(parquet_test
USE_BUNDLE
SOURCES
diff --git a/src/iceberg/test/delete_file_index_test.cc
b/src/iceberg/test/delete_file_index_test.cc
new file mode 100644
index 00000000..e091f5ea
--- /dev/null
+++ b/src/iceberg/test/delete_file_index_test.cc
@@ -0,0 +1,1190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/delete_file_index.h"
+
+#include <chrono>
+#include <format>
+#include <memory>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/avro/avro_register.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/metadata_columns.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/schema.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/transform.h"
+#include "iceberg/type.h"
+
+namespace iceberg {
+
+class DeleteFileIndexTest : public testing::TestWithParam<int> {
+ protected:
+ void SetUp() override {
+ avro::RegisterAll();
+
+ file_io_ = arrow::MakeMockFileIO();
+
+ // Schema with id and data fields
+ schema_ = std::make_shared<Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(/*field_id=*/1, "id", int32()),
+ SchemaField::MakeRequired(/*field_id=*/2, "data", string())});
+
+ // Partitioned spec: bucket by data
+ ICEBERG_UNWRAP_OR_FAIL(
+ partitioned_spec_,
+ PartitionSpec::Make(
+ /*spec_id=*/1, {PartitionField(/*source_id=*/2, /*field_id=*/1000,
+ "data_bucket",
Transform::Bucket(16))}));
+
+ // Unpartitioned spec
+ unpartitioned_spec_ = PartitionSpec::Unpartitioned();
+
+ // Create sample data files
+ file_a_ = MakeDataFile("/path/to/data-a.parquet",
PartitionValues({Literal::Int(0)}),
+ partitioned_spec_->spec_id());
+ file_b_ = MakeDataFile("/path/to/data-b.parquet",
PartitionValues({Literal::Int(1)}),
+ partitioned_spec_->spec_id());
+ file_c_ = MakeDataFile("/path/to/data-c.parquet",
PartitionValues({Literal::Int(2)}),
+ partitioned_spec_->spec_id());
+ unpartitioned_file_ = MakeDataFile("/path/to/data-unpartitioned.parquet",
+ PartitionValues(std::vector<Literal>{}),
+ unpartitioned_spec_->spec_id());
+ }
+
+ std::string MakeManifestPath() {
+ static int counter = 0;
+ return std::format("manifest-{}-{}.avro", counter++,
+
std::chrono::system_clock::now().time_since_epoch().count());
+ }
+
+ std::shared_ptr<DataFile> MakeDataFile(const std::string& path,
+ const PartitionValues& partition,
+ int32_t spec_id, int64_t record_count
= 1) {
+ return std::make_shared<DataFile>(DataFile{
+ .file_path = path,
+ .file_format = FileFormatType::kParquet,
+ .partition = partition,
+ .record_count = record_count,
+ .file_size_in_bytes = 10,
+ .sort_order_id = 0,
+ .partition_spec_id = spec_id,
+ });
+ }
+
+ std::shared_ptr<DataFile> MakePositionDeleteFile(
+ const std::string& path, const PartitionValues& partition, int32_t
spec_id,
+ std::optional<std::string> referenced_file = std::nullopt) {
+ return std::make_shared<DataFile>(DataFile{
+ .content = DataFile::Content::kPositionDeletes,
+ .file_path = path,
+ .file_format = FileFormatType::kParquet,
+ .partition = partition,
+ .record_count = 1,
+ .file_size_in_bytes = 10,
+ .partition_spec_id = spec_id,
+ .referenced_data_file = referenced_file,
+ });
+ }
+
+ std::shared_ptr<DataFile> MakeEqualityDeleteFile(const std::string& path,
+ const PartitionValues&
partition,
+ int32_t spec_id,
+ std::vector<int>
equality_ids = {1}) {
+ return std::make_shared<DataFile>(DataFile{
+ .content = DataFile::Content::kEqualityDeletes,
+ .file_path = path,
+ .file_format = FileFormatType::kParquet,
+ .partition = partition,
+ .record_count = 1,
+ .file_size_in_bytes = 10,
+ .equality_ids = std::move(equality_ids),
+ .partition_spec_id = spec_id,
+ });
+ }
+
+ std::shared_ptr<DataFile> MakeDV(const std::string& path,
+ const PartitionValues& partition, int32_t
spec_id,
+ const std::string& referenced_file,
+ int64_t content_offset = 4L,
+ int64_t content_size = 6L) {
+ return std::make_shared<DataFile>(DataFile{
+ .content = DataFile::Content::kPositionDeletes,
+ .file_path = path,
+ .file_format = FileFormatType::kPuffin,
+ .partition = partition,
+ .record_count = 1,
+ .file_size_in_bytes = 10,
+ .partition_spec_id = spec_id,
+ .referenced_data_file = referenced_file,
+ .content_offset = content_offset,
+ .content_size_in_bytes = content_size,
+ });
+ }
+
+ ManifestEntry MakeDeleteEntry(int64_t snapshot_id, int64_t sequence_number,
+ std::shared_ptr<DataFile> file,
+ ManifestStatus status =
ManifestStatus::kAdded) {
+ return ManifestEntry{
+ .status = status,
+ .snapshot_id = snapshot_id,
+ .sequence_number = sequence_number,
+ .file_sequence_number = sequence_number,
+ .data_file = std::move(file),
+ };
+ }
+
+ ManifestFile WriteDeleteManifest(int format_version, int64_t snapshot_id,
+ std::vector<ManifestEntry> entries,
+ std::shared_ptr<PartitionSpec> spec) {
+ const std::string manifest_path = MakeManifestPath();
+
+ Result<std::unique_ptr<ManifestWriter>> writer_result =
+ NotSupported("Format version: {}", format_version);
+
+ if (format_version == 2) {
+ writer_result = ManifestWriter::MakeV2Writer(
+ snapshot_id, manifest_path, file_io_, spec, schema_,
ManifestContent::kDeletes);
+ } else if (format_version == 3) {
+ writer_result = ManifestWriter::MakeV3Writer(
+ snapshot_id, /*first_row_id=*/std::nullopt, manifest_path, file_io_,
spec,
+ schema_, ManifestContent::kDeletes);
+ }
+
+ EXPECT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ for (const auto& entry : entries) {
+ EXPECT_THAT(writer->WriteEntry(entry), IsOk());
+ }
+
+ EXPECT_THAT(writer->Close(), IsOk());
+ auto manifest_result = writer->ToManifestFile();
+ EXPECT_THAT(manifest_result, IsOk());
+ return std::move(manifest_result.value());
+ }
+
+ std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> GetSpecsById() {
+ return {{partitioned_spec_->spec_id(), partitioned_spec_},
+ {unpartitioned_spec_->spec_id(), unpartitioned_spec_}};
+ }
+
+ Result<std::unique_ptr<DeleteFileIndex>> BuildIndex(
+ std::vector<ManifestFile> delete_manifests,
+ std::optional<int64_t> after_sequence_number = std::nullopt) {
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto builder, DeleteFileIndex::BuilderFor(file_io_,
std::move(delete_manifests)));
+ builder.SpecsById(GetSpecsById()).WithSchema(schema_);
+ if (after_sequence_number.has_value()) {
+ builder.AfterSequenceNumber(after_sequence_number.value());
+ }
+ return builder.Build();
+ }
+
+ std::shared_ptr<FileIO> file_io_;
+ std::shared_ptr<Schema> schema_;
+ std::shared_ptr<PartitionSpec> partitioned_spec_;
+ std::shared_ptr<PartitionSpec> unpartitioned_spec_;
+
+ std::shared_ptr<DataFile> file_a_;
+ std::shared_ptr<DataFile> file_b_;
+ std::shared_ptr<DataFile> file_c_;
+ std::shared_ptr<DataFile> unpartitioned_file_;
+
+ // Helper to extract paths from delete files for comparison
+ static std::vector<std::string> GetPaths(
+ const std::vector<std::shared_ptr<DataFile>>& files) {
+ return std::ranges::transform_view(files,
+ [](const auto& f) { return
f->file_path; }) |
+ std::ranges::to<std::vector<std::string>>();
+ }
+};
+
+TEST_P(DeleteFileIndexTest, TestEmptyIndex) {
+ ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({}));
+
+ EXPECT_TRUE(index->empty());
+ EXPECT_FALSE(index->has_equality_deletes());
+ EXPECT_FALSE(index->has_position_deletes());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *file_a_));
+ EXPECT_TRUE(deletes.empty());
+}
+
+TEST_P(DeleteFileIndexTest, TestMinSequenceNumberFilteringForFiles) {
+ int version = GetParam();
+
+ auto eq_delete_1 = MakeEqualityDeleteFile("/path/to/eq-delete-1.parquet",
+
PartitionValues(std::vector<Literal>{}),
+ unpartitioned_spec_->spec_id());
+ auto eq_delete_2 = MakeEqualityDeleteFile("/path/to/eq-delete-2.parquet",
+
PartitionValues(std::vector<Literal>{}),
+ unpartitioned_spec_->spec_id());
+
+ std::vector<ManifestEntry> entries;
+ entries.push_back(
+ MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/4,
eq_delete_1));
+ entries.push_back(
+ MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/6,
eq_delete_2));
+
+ auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L,
std::move(entries),
+ unpartitioned_spec_);
+
+ // Build index with afterSequenceNumber = 4
+ ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest},
/*after_sequence_number=*/4));
+
+ EXPECT_TRUE(index->has_equality_deletes());
+ EXPECT_FALSE(index->has_position_deletes());
+
+ // Only delete file with seq > 4 should be included (seq=6)
+ ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0,
*unpartitioned_file_));
+ EXPECT_EQ(deletes.size(), 1);
+ EXPECT_EQ(deletes[0]->file_path, "/path/to/eq-delete-2.parquet");
+}
+
+TEST_P(DeleteFileIndexTest, TestUnpartitionedDeletes) {
+ int version = GetParam();
+
+ auto eq_delete_1 = MakeEqualityDeleteFile("/path/to/eq-delete-1.parquet",
+
PartitionValues(std::vector<Literal>{}),
+ unpartitioned_spec_->spec_id());
+ auto eq_delete_2 = MakeEqualityDeleteFile("/path/to/eq-delete-2.parquet",
+
PartitionValues(std::vector<Literal>{}),
+ unpartitioned_spec_->spec_id());
+ auto pos_delete_1 = MakePositionDeleteFile("/path/to/pos-delete-1.parquet",
+
PartitionValues(std::vector<Literal>{}),
+ unpartitioned_spec_->spec_id());
+ auto pos_delete_2 = MakePositionDeleteFile("/path/to/pos-delete-2.parquet",
+
PartitionValues(std::vector<Literal>{}),
+ unpartitioned_spec_->spec_id());
+
+ std::vector<ManifestEntry> entries;
+ entries.push_back(
+ MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/4,
eq_delete_1));
+ entries.push_back(
+ MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/6,
eq_delete_2));
+ entries.push_back(
+ MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/5,
pos_delete_1));
+ entries.push_back(
+ MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/6,
pos_delete_2));
+
+ auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L,
std::move(entries),
+ unpartitioned_spec_);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest}));
+
+ EXPECT_TRUE(index->has_equality_deletes());
+ EXPECT_TRUE(index->has_position_deletes());
+
+ // All deletes should apply to seq 0
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0,
*unpartitioned_file_));
+ EXPECT_EQ(deletes.size(), 4);
+ EXPECT_THAT(GetPaths(deletes),
+ testing::UnorderedElementsAre(
+ "/path/to/eq-delete-1.parquet",
"/path/to/eq-delete-2.parquet",
+ "/path/to/pos-delete-1.parquet",
"/path/to/pos-delete-2.parquet"));
+ }
+
+ // All deletes should apply to seq 3
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(3,
*unpartitioned_file_));
+ EXPECT_EQ(deletes.size(), 4);
+ EXPECT_THAT(GetPaths(deletes),
+ testing::UnorderedElementsAre(
+ "/path/to/eq-delete-1.parquet",
"/path/to/eq-delete-2.parquet",
+ "/path/to/pos-delete-1.parquet",
"/path/to/pos-delete-2.parquet"));
+ }
+
+ // Last 3 deletes should apply to seq 4 (eq_delete_2, pos_delete_1,
pos_delete_2)
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(4,
*unpartitioned_file_));
+ EXPECT_EQ(deletes.size(), 3);
+ EXPECT_THAT(GetPaths(deletes),
+ testing::UnorderedElementsAre("/path/to/eq-delete-2.parquet",
+ "/path/to/pos-delete-1.parquet",
+
"/path/to/pos-delete-2.parquet"));
+ }
+
+ // Last 3 deletes should apply to seq 5
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(5,
*unpartitioned_file_));
+ EXPECT_EQ(deletes.size(), 3);
+ EXPECT_THAT(GetPaths(deletes),
+ testing::UnorderedElementsAre("/path/to/eq-delete-2.parquet",
+ "/path/to/pos-delete-1.parquet",
+
"/path/to/pos-delete-2.parquet"));
+ }
+
+ // Last delete should apply to seq 6 (only pos_delete_2)
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(6,
*unpartitioned_file_));
+ EXPECT_EQ(deletes.size(), 1);
+ EXPECT_EQ(deletes[0]->file_path, "/path/to/pos-delete-2.parquet");
+ }
+
+ // No deletes should apply to seq 7
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(7,
*unpartitioned_file_));
+ EXPECT_TRUE(deletes.empty());
+ }
+
+ // Global equality deletes should apply to a partitioned file
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *file_a_));
+ // Only equality deletes are global, position deletes are not
+ EXPECT_EQ(deletes.size(), 2);
+ EXPECT_THAT(GetPaths(deletes),
+ testing::UnorderedElementsAre("/path/to/eq-delete-1.parquet",
+ "/path/to/eq-delete-2.parquet"));
+ }
+}
+
+TEST_P(DeleteFileIndexTest, TestPartitionedDeleteIndex) {
+ int version = GetParam();
+
+ auto partition_a = PartitionValues({Literal::Int(0)});
+ auto eq_delete_1 = MakeEqualityDeleteFile("/path/to/eq-delete-1.parquet",
partition_a,
+ partitioned_spec_->spec_id());
+ auto eq_delete_2 = MakeEqualityDeleteFile("/path/to/eq-delete-2.parquet",
partition_a,
+ partitioned_spec_->spec_id());
+ auto pos_delete_1 = MakePositionDeleteFile("/path/to/pos-delete-1.parquet",
partition_a,
+ partitioned_spec_->spec_id());
+ auto pos_delete_2 = MakePositionDeleteFile("/path/to/pos-delete-2.parquet",
partition_a,
+ partitioned_spec_->spec_id());
+
+ std::vector<ManifestEntry> entries;
+ entries.push_back(
+ MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/4,
eq_delete_1));
+ entries.push_back(
+ MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/6,
eq_delete_2));
+ entries.push_back(
+ MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/5,
pos_delete_1));
+ entries.push_back(
+ MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/6,
pos_delete_2));
+
+ auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L,
std::move(entries),
+ partitioned_spec_);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest}));
+
+ EXPECT_TRUE(index->has_equality_deletes());
+ EXPECT_TRUE(index->has_position_deletes());
+
+ // All deletes should apply to file_a_ at seq 0
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *file_a_));
+ EXPECT_EQ(deletes.size(), 4);
+ EXPECT_THAT(GetPaths(deletes),
+ testing::UnorderedElementsAre(
+ "/path/to/eq-delete-1.parquet",
"/path/to/eq-delete-2.parquet",
+ "/path/to/pos-delete-1.parquet",
"/path/to/pos-delete-2.parquet"));
+ }
+
+ // All deletes should apply to file_a_ at seq 3
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(3, *file_a_));
+ EXPECT_EQ(deletes.size(), 4);
+ EXPECT_THAT(GetPaths(deletes),
+ testing::UnorderedElementsAre(
+ "/path/to/eq-delete-1.parquet",
"/path/to/eq-delete-2.parquet",
+ "/path/to/pos-delete-1.parquet",
"/path/to/pos-delete-2.parquet"));
+ }
+
+ // Last 3 deletes should apply to file_a_ at seq 4
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(4, *file_a_));
+ EXPECT_EQ(deletes.size(), 3);
+ EXPECT_THAT(GetPaths(deletes),
+ testing::UnorderedElementsAre("/path/to/eq-delete-2.parquet",
+ "/path/to/pos-delete-1.parquet",
+
"/path/to/pos-delete-2.parquet"));
+ }
+
+ // Last 3 deletes should apply to file_a_ at seq 5
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(5, *file_a_));
+ EXPECT_EQ(deletes.size(), 3);
+ EXPECT_THAT(GetPaths(deletes),
+ testing::UnorderedElementsAre("/path/to/eq-delete-2.parquet",
+ "/path/to/pos-delete-1.parquet",
+
"/path/to/pos-delete-2.parquet"));
+ }
+
+ // Last delete should apply to file_a_ at seq 6
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(6, *file_a_));
+ EXPECT_EQ(deletes.size(), 1);
+ EXPECT_EQ(deletes[0]->file_path, "/path/to/pos-delete-2.parquet");
+ }
+
+ // No deletes should apply to file_a_ at seq 7
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(7, *file_a_));
+ EXPECT_TRUE(deletes.empty());
+ }
+
+ // No deletes should apply to file_b_ (different partition)
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *file_b_));
+ EXPECT_TRUE(deletes.empty());
+ }
+
+ // No deletes should apply to file_c_ (different partition)
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *file_c_));
+ EXPECT_TRUE(deletes.empty());
+ }
+
+ // No deletes should apply to unpartitioned file (different spec)
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0,
*unpartitioned_file_));
+ EXPECT_TRUE(deletes.empty());
+ }
+}
+
+TEST_P(DeleteFileIndexTest, TestPartitionedTableWithPartitionPosDeletes) {
+ int version = GetParam();
+
+ auto partition_a = PartitionValues({Literal::Int(0)});
+ auto pos_delete = MakePositionDeleteFile("/path/to/pos-delete.parquet",
partition_a,
+ partitioned_spec_->spec_id());
+
+ std::vector<ManifestEntry> entries;
+ entries.push_back(
+ MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/2,
pos_delete));
+
+ auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L,
std::move(entries),
+ partitioned_spec_);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest}));
+
+ EXPECT_FALSE(index->has_equality_deletes());
+ EXPECT_TRUE(index->has_position_deletes());
+
+ // Position delete should apply to file_a_ at seq 1
+ ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(1, *file_a_));
+ EXPECT_EQ(deletes.size(), 1);
+ EXPECT_EQ(deletes[0]->file_path, "/path/to/pos-delete.parquet");
+}
+
+TEST_P(DeleteFileIndexTest, TestPartitionedTableWithPartitionEqDeletes) {
+ int version = GetParam();
+
+ auto partition_a = PartitionValues({Literal::Int(0)});
+ auto eq_delete = MakeEqualityDeleteFile("/path/to/eq-delete.parquet",
partition_a,
+ partitioned_spec_->spec_id());
+
+ std::vector<ManifestEntry> entries;
+ entries.push_back(
+ MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/2,
eq_delete));
+
+ auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L,
std::move(entries),
+ partitioned_spec_);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest}));
+
+ EXPECT_TRUE(index->has_equality_deletes());
+ EXPECT_FALSE(index->has_position_deletes());
+
+ // Equality delete should apply to file_a_ at seq 1
+ ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(1, *file_a_));
+ EXPECT_EQ(deletes.size(), 1);
+ EXPECT_EQ(deletes[0]->file_path, "/path/to/eq-delete.parquet");
+}
+
+TEST_P(DeleteFileIndexTest, TestPartitionedTableWithUnrelatedPartitionDeletes)
{
+ int version = GetParam();
+
+ // Create deletes for partition A
+ auto partition_a = PartitionValues({Literal::Int(0)});
+ auto pos_delete = MakePositionDeleteFile("/path/to/pos-delete.parquet",
partition_a,
+ partitioned_spec_->spec_id());
+ auto eq_delete = MakeEqualityDeleteFile("/path/to/eq-delete.parquet",
partition_a,
+ partitioned_spec_->spec_id());
+
+ std::vector<ManifestEntry> entries;
+ entries.push_back(
+ MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/2,
pos_delete));
+ entries.push_back(
+ MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/2,
eq_delete));
+
+ auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L,
std::move(entries),
+ partitioned_spec_);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest}));
+
+ // No deletes should apply to file_b_ (different partition)
+ ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(1, *file_b_));
+ EXPECT_TRUE(deletes.empty());
+}
+
+TEST_P(DeleteFileIndexTest, TestPartitionedTableWithOlderPartitionDeletes) {
+ int version = GetParam();
+ if (version >= 3) {
+ GTEST_SKIP() << "DVs are not filtered using sequence numbers in V3+";
+ }
+
+ auto partition_a = PartitionValues({Literal::Int(0)});
+ auto pos_delete = MakePositionDeleteFile("/path/to/pos-delete.parquet",
partition_a,
+ partitioned_spec_->spec_id());
+ auto eq_delete = MakeEqualityDeleteFile("/path/to/eq-delete.parquet",
partition_a,
+ partitioned_spec_->spec_id());
+
+ // Delete files have sequence number 1
+ std::vector<ManifestEntry> entries;
+ entries.push_back(
+ MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1,
pos_delete));
+ entries.push_back(
+ MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1,
eq_delete));
+
+ auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L,
std::move(entries),
+ partitioned_spec_);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest}));
+
+ // Data file with sequence number 2 should not have any deletes applied
+ // (deletes were committed before the data file)
+ ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(2, *file_a_));
+ EXPECT_TRUE(deletes.empty());
+}
+
+TEST_P(DeleteFileIndexTest, TestPartitionedTableScanWithGlobalDeletes) {
+ int version = GetParam();
+ if (version >= 3) {
+ GTEST_SKIP() << "Different behavior for position deletes in V3";
+ }
+
+ // Create unpartitioned equality and position deletes
+ auto eq_delete = MakeEqualityDeleteFile("/path/to/eq-delete.parquet",
+
PartitionValues(std::vector<Literal>{}),
+ unpartitioned_spec_->spec_id());
+ auto pos_delete = MakePositionDeleteFile("/path/to/pos-delete.parquet",
+
PartitionValues(std::vector<Literal>{}),
+ unpartitioned_spec_->spec_id());
+
+ std::vector<ManifestEntry> entries;
+ entries.push_back(
+ MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/2,
eq_delete));
+ entries.push_back(
+ MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/2,
pos_delete));
+
+ auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L,
std::move(entries),
+ unpartitioned_spec_);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest}));
+
+ // Only global equality deletes should apply to partitioned file_a_
+ ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(1, *file_a_));
+ EXPECT_EQ(deletes.size(), 1);
+ EXPECT_EQ(deletes[0]->file_path, "/path/to/eq-delete.parquet");
+}
+
+TEST_P(DeleteFileIndexTest,
TestPartitionedTableScanWithGlobalAndPartitionDeletes) {
+ int version = GetParam();
+ if (version >= 3) {
+ GTEST_SKIP() << "Different behavior for position deletes in V3";
+ }
+
+ // Create partition-scoped equality delete
+ auto partition_a = PartitionValues({Literal::Int(0)});
+ auto partition_eq_delete = MakeEqualityDeleteFile(
+ "/path/to/partition-eq-delete.parquet", partition_a,
partitioned_spec_->spec_id());
+
+ std::vector<ManifestEntry> partition_entries;
+ partition_entries.push_back(
+ MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/2,
partition_eq_delete));
+
+ auto partition_manifest = WriteDeleteManifest(
+ version, /*snapshot_id=*/1000L, std::move(partition_entries),
partitioned_spec_);
+
+ // Create unpartitioned equality and position deletes
+ auto global_eq_delete =
MakeEqualityDeleteFile("/path/to/global-eq-delete.parquet",
+
PartitionValues(std::vector<Literal>{}),
+
unpartitioned_spec_->spec_id());
+ auto global_pos_delete =
MakePositionDeleteFile("/path/to/global-pos-delete.parquet",
+
PartitionValues(std::vector<Literal>{}),
+
unpartitioned_spec_->spec_id());
+
+ std::vector<ManifestEntry> global_entries;
+ global_entries.push_back(
+ MakeDeleteEntry(/*snapshot_id=*/1001L, /*sequence_number=*/3,
global_eq_delete));
+ global_entries.push_back(
+ MakeDeleteEntry(/*snapshot_id=*/1001L, /*sequence_number=*/3,
global_pos_delete));
+
+ auto global_manifest = WriteDeleteManifest(
+ version, /*snapshot_id=*/1001L, std::move(global_entries),
unpartitioned_spec_);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({partition_manifest,
global_manifest}));
+
+ // Both partition-scoped and global equality deletes should apply to file_a_
+ ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(1, *file_a_));
+ EXPECT_EQ(deletes.size(), 2);
+ EXPECT_THAT(GetPaths(deletes),
+
testing::UnorderedElementsAre("/path/to/partition-eq-delete.parquet",
+
"/path/to/global-eq-delete.parquet"));
+}
+
+TEST_P(DeleteFileIndexTest, TestPartitionedTableSequenceNumbers) {
+ int version = GetParam();
+
+ auto partition_a = PartitionValues({Literal::Int(0)});
+ auto eq_delete = MakeEqualityDeleteFile("/path/to/eq-delete.parquet",
partition_a,
+ partitioned_spec_->spec_id());
+ auto pos_delete = MakePositionDeleteFile("/path/to/pos-delete.parquet",
partition_a,
+ partitioned_spec_->spec_id());
+
+ // Both data and deletes have same sequence number (same commit)
+ std::vector<ManifestEntry> entries;
+ entries.push_back(
+ MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1,
eq_delete));
+ entries.push_back(
+ MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1,
pos_delete));
+
+ auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L,
std::move(entries),
+ partitioned_spec_);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest}));
+
+ // Data file with sequence number 1 should only have position deletes applied
+ // (equality deletes apply to data with seq < delete seq)
+ ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(1, *file_a_));
+ EXPECT_EQ(deletes.size(), 1);
+ EXPECT_EQ(deletes[0]->file_path, "/path/to/pos-delete.parquet");
+}
+
+TEST_P(DeleteFileIndexTest, TestUnpartitionedTableSequenceNumbers) {
+ int version = GetParam();
+ if (version >= 3) {
+ GTEST_SKIP() << "Different behavior in V3";
+ }
+
+ auto eq_delete = MakeEqualityDeleteFile("/path/to/eq-delete.parquet",
+
PartitionValues(std::vector<Literal>{}),
+ unpartitioned_spec_->spec_id());
+ auto pos_delete = MakePositionDeleteFile("/path/to/pos-delete.parquet",
+
PartitionValues(std::vector<Literal>{}),
+ unpartitioned_spec_->spec_id());
+
+ // Both have same sequence number
+ std::vector<ManifestEntry> entries;
+ entries.push_back(
+ MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1,
eq_delete));
+ entries.push_back(
+ MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1,
pos_delete));
+
+ auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L,
std::move(entries),
+ unpartitioned_spec_);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest}));
+
+ // Data file with sequence number 1 should only have position deletes applied
+ ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(1,
*unpartitioned_file_));
+ EXPECT_EQ(deletes.size(), 1);
+ EXPECT_EQ(deletes[0]->file_path, "/path/to/pos-delete.parquet");
+}
+
+TEST_P(DeleteFileIndexTest, TestPositionDeletesGroup) {
+ internal::PositionDeletes group;
+
+ auto partition_a = PartitionValues({Literal::Int(0)});
+ auto file1 = MakePositionDeleteFile("/path/to/pos-delete-1.parquet",
partition_a,
+ partitioned_spec_->spec_id());
+ auto file2 = MakePositionDeleteFile("/path/to/pos-delete-2.parquet",
partition_a,
+ partitioned_spec_->spec_id());
+ auto file3 = MakePositionDeleteFile("/path/to/pos-delete-3.parquet",
partition_a,
+ partitioned_spec_->spec_id());
+ auto file4 = MakePositionDeleteFile("/path/to/pos-delete-4.parquet",
partition_a,
+ partitioned_spec_->spec_id());
+
+ // Add files out of order
+ EXPECT_THAT(group.Add(MakeDeleteEntry(1000L, 4, file4)), IsOk());
+ EXPECT_THAT(group.Add(MakeDeleteEntry(1000L, 2, file2)), IsOk());
+ EXPECT_THAT(group.Add(MakeDeleteEntry(1000L, 1, file1)), IsOk());
+ EXPECT_THAT(group.Add(MakeDeleteEntry(1000L, 3, file3)), IsOk());
+
+ // Group must not be empty
+ EXPECT_FALSE(group.empty());
+
+ // All files must be reported as referenced
+ auto referenced = group.ReferencedDeleteFiles();
+ EXPECT_EQ(referenced.size(), 4);
+ EXPECT_THAT(GetPaths(referenced),
+ testing::UnorderedElementsAre(
+ "/path/to/pos-delete-1.parquet",
"/path/to/pos-delete-2.parquet",
+ "/path/to/pos-delete-3.parquet",
"/path/to/pos-delete-4.parquet"));
+
+ // Position deletes are indexed by their data sequence numbers
+ {
+ auto filtered = group.Filter(0);
+ EXPECT_EQ(filtered.size(), 4);
+ EXPECT_THAT(GetPaths(filtered),
+ testing::UnorderedElementsAre(
+ "/path/to/pos-delete-1.parquet",
"/path/to/pos-delete-2.parquet",
+ "/path/to/pos-delete-3.parquet",
"/path/to/pos-delete-4.parquet"));
+ }
+ {
+ auto filtered = group.Filter(1);
+ EXPECT_EQ(filtered.size(), 4);
+ EXPECT_THAT(GetPaths(filtered),
+ testing::UnorderedElementsAre(
+ "/path/to/pos-delete-1.parquet",
"/path/to/pos-delete-2.parquet",
+ "/path/to/pos-delete-3.parquet",
"/path/to/pos-delete-4.parquet"));
+ }
+ {
+ auto filtered = group.Filter(2);
+ EXPECT_EQ(filtered.size(), 3);
+ EXPECT_THAT(GetPaths(filtered),
+ testing::UnorderedElementsAre("/path/to/pos-delete-2.parquet",
+ "/path/to/pos-delete-3.parquet",
+
"/path/to/pos-delete-4.parquet"));
+ }
+ {
+ auto filtered = group.Filter(3);
+ EXPECT_EQ(filtered.size(), 2);
+ EXPECT_THAT(GetPaths(filtered),
+ testing::UnorderedElementsAre("/path/to/pos-delete-3.parquet",
+
"/path/to/pos-delete-4.parquet"));
+ }
+ {
+ auto filtered = group.Filter(4);
+ EXPECT_EQ(filtered.size(), 1);
+ EXPECT_EQ(filtered[0]->file_path, "/path/to/pos-delete-4.parquet");
+ }
+ {
+ auto filtered = group.Filter(5);
+ EXPECT_EQ(filtered.size(), 0);
+ }
+}
+
+TEST_P(DeleteFileIndexTest, TestEqualityDeletesGroup) {
+ internal::EqualityDeletes group(*schema_);
+
+ auto partition_a = PartitionValues({Literal::Int(0)});
+ auto file1 = MakeEqualityDeleteFile("/path/to/eq-delete-1.parquet",
partition_a,
+ partitioned_spec_->spec_id());
+ auto file2 = MakeEqualityDeleteFile("/path/to/eq-delete-2.parquet",
partition_a,
+ partitioned_spec_->spec_id());
+ auto file3 = MakeEqualityDeleteFile("/path/to/eq-delete-3.parquet",
partition_a,
+ partitioned_spec_->spec_id());
+ auto file4 = MakeEqualityDeleteFile("/path/to/eq-delete-4.parquet",
partition_a,
+ partitioned_spec_->spec_id());
+
+ // Add files out of order
+ EXPECT_THAT(group.Add(MakeDeleteEntry(1000L, 4, file4)), IsOk());
+ EXPECT_THAT(group.Add(MakeDeleteEntry(1000L, 2, file2)), IsOk());
+ EXPECT_THAT(group.Add(MakeDeleteEntry(1000L, 1, file1)), IsOk());
+ EXPECT_THAT(group.Add(MakeDeleteEntry(1000L, 3, file3)), IsOk());
+
+ // Group must not be empty
+ EXPECT_FALSE(group.empty());
+
+ // All files must be reported as referenced
+ auto referenced = group.ReferencedDeleteFiles();
+ EXPECT_EQ(referenced.size(), 4);
+ EXPECT_THAT(GetPaths(referenced),
+ testing::UnorderedElementsAre(
+ "/path/to/eq-delete-1.parquet",
"/path/to/eq-delete-2.parquet",
+ "/path/to/eq-delete-3.parquet",
"/path/to/eq-delete-4.parquet"));
+
+ // Equality deletes are indexed by data sequence number - 1 to apply to next
snapshots
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto filtered, group.Filter(0, *file_a_));
+ EXPECT_EQ(filtered.size(), 4);
+ EXPECT_THAT(GetPaths(filtered),
+ testing::UnorderedElementsAre(
+ "/path/to/eq-delete-1.parquet",
"/path/to/eq-delete-2.parquet",
+ "/path/to/eq-delete-3.parquet",
"/path/to/eq-delete-4.parquet"));
+ }
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto filtered, group.Filter(1, *file_a_));
+ EXPECT_EQ(filtered.size(), 3);
+ EXPECT_THAT(GetPaths(filtered),
+ testing::UnorderedElementsAre("/path/to/eq-delete-2.parquet",
+ "/path/to/eq-delete-3.parquet",
+ "/path/to/eq-delete-4.parquet"));
+ }
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto filtered, group.Filter(2, *file_a_));
+ EXPECT_EQ(filtered.size(), 2);
+ EXPECT_THAT(GetPaths(filtered),
+ testing::UnorderedElementsAre("/path/to/eq-delete-3.parquet",
+ "/path/to/eq-delete-4.parquet"));
+ }
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto filtered, group.Filter(3, *file_a_));
+ EXPECT_EQ(filtered.size(), 1);
+ EXPECT_EQ(filtered[0]->file_path, "/path/to/eq-delete-4.parquet");
+ }
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto filtered, group.Filter(4, *file_a_));
+ EXPECT_EQ(filtered.size(), 0);
+ }
+}
+
+TEST_P(DeleteFileIndexTest, TestMixDeleteFilesAndDVs) {
+ int version = GetParam();
+ if (version < 3) {
+ GTEST_SKIP() << "DVs only supported in V3+";
+ }
+
+ auto partition_a = PartitionValues({Literal::Int(0)});
+ auto partition_b = PartitionValues({Literal::Int(1)});
+
+ // Position delete for file_a_
+ auto pos_delete_a = MakePositionDeleteFile("/path/to/pos-delete-a.parquet",
partition_a,
+ partitioned_spec_->spec_id());
+ // DV for file_a_ (should take precedence)
+ auto dv_a = MakeDV("/path/to/dv-a.puffin", partition_a,
partitioned_spec_->spec_id(),
+ file_a_->file_path);
+ // Position deletes for file_b_ (no DV)
+ auto pos_delete_b1 = MakePositionDeleteFile("/path/to/pos-delete-b1.parquet",
+ partition_b,
partitioned_spec_->spec_id());
+ auto pos_delete_b2 = MakePositionDeleteFile("/path/to/pos-delete-b2.parquet",
+ partition_b,
partitioned_spec_->spec_id());
+
+ std::vector<ManifestEntry> entries;
+ entries.push_back(
+ MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1,
pos_delete_a));
+ entries.push_back(MakeDeleteEntry(/*snapshot_id=*/1000L,
/*sequence_number=*/2, dv_a));
+ entries.push_back(
+ MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1,
pos_delete_b1));
+ entries.push_back(
+ MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/2,
pos_delete_b2));
+
+ auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L,
std::move(entries),
+ partitioned_spec_);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest}));
+
+ // Only DV should apply to file_a_
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *file_a_));
+ EXPECT_EQ(deletes.size(), 1);
+ EXPECT_TRUE(deletes[0]->content_offset.has_value()); // DV has
content_offset
+ EXPECT_EQ(deletes[0]->referenced_data_file, file_a_->file_path);
+ EXPECT_EQ(deletes[0]->file_path, "/path/to/dv-a.puffin");
+ }
+
+ // Two delete files should apply to file_b_
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *file_b_));
+ EXPECT_EQ(deletes.size(), 2);
+ EXPECT_FALSE(deletes[0]->content_offset.has_value()); // Not DVs
+ EXPECT_FALSE(deletes[1]->content_offset.has_value());
+ EXPECT_THAT(GetPaths(deletes),
+ testing::UnorderedElementsAre("/path/to/pos-delete-b1.parquet",
+
"/path/to/pos-delete-b2.parquet"));
+ }
+}
+
+TEST_P(DeleteFileIndexTest, TestMultipleDVs) {
+ int version = GetParam();
+ if (version < 3) {
+ GTEST_SKIP() << "DVs only supported in V3+";
+ }
+
+ auto partition_a = PartitionValues({Literal::Int(0)});
+
+ auto dv1 = MakeDV("/path/to/dv1.puffin", partition_a,
partitioned_spec_->spec_id(),
+ file_a_->file_path);
+ auto dv2 = MakeDV("/path/to/dv2.puffin", partition_a,
partitioned_spec_->spec_id(),
+ file_a_->file_path);
+
+ std::vector<ManifestEntry> entries;
+ entries.push_back(MakeDeleteEntry(/*snapshot_id=*/1000L,
/*sequence_number=*/1, dv1));
+ entries.push_back(MakeDeleteEntry(/*snapshot_id=*/1000L,
/*sequence_number=*/2, dv2));
+
+ auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L,
std::move(entries),
+ partitioned_spec_);
+
+ auto index_result = BuildIndex({manifest});
+ EXPECT_THAT(index_result, IsError(ErrorKind::kValidationFailed));
+ EXPECT_THAT(index_result, HasErrorMessage("Can't index multiple DVs"));
+ EXPECT_THAT(index_result, HasErrorMessage(file_a_->file_path));
+}
+
+TEST_P(DeleteFileIndexTest, TestInvalidDVSequenceNumber) {
+ int version = GetParam();
+ if (version < 3) {
+ GTEST_SKIP() << "DVs only supported in V3+";
+ }
+
+ auto partition_a = PartitionValues({Literal::Int(0)});
+
+ auto dv = MakeDV("/path/to/dv.puffin", partition_a,
partitioned_spec_->spec_id(),
+ file_a_->file_path);
+
+ std::vector<ManifestEntry> entries;
+ entries.push_back(MakeDeleteEntry(/*snapshot_id=*/1000L,
/*sequence_number=*/1, dv));
+
+ auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L,
std::move(entries),
+ partitioned_spec_);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest}));
+
+ // Querying with sequence number > DV sequence number should fail
+ auto result = index->ForDataFile(2, *file_a_);
+ EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+ EXPECT_THAT(result, HasErrorMessage(
+ "must be greater than or equal to data file sequence
number"));
+}
+
+TEST_P(DeleteFileIndexTest, TestReferencedDeleteFiles) {
+ int version = GetParam();
+
+ auto partition_a = PartitionValues({Literal::Int(0)});
+ auto eq_delete = MakeEqualityDeleteFile("/path/to/eq-delete.parquet",
partition_a,
+ partitioned_spec_->spec_id());
+ auto pos_delete = MakePositionDeleteFile("/path/to/pos-delete.parquet",
partition_a,
+ partitioned_spec_->spec_id());
+ auto global_eq_delete =
MakeEqualityDeleteFile("/path/to/global-eq-delete.parquet",
+
PartitionValues(std::vector<Literal>{}),
+
unpartitioned_spec_->spec_id());
+
+ std::vector<ManifestEntry> partition_entries;
+ partition_entries.push_back(
+ MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1,
eq_delete));
+ partition_entries.push_back(
+ MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1,
pos_delete));
+
+ auto partition_manifest = WriteDeleteManifest(
+ version, /*snapshot_id=*/1000L, std::move(partition_entries),
partitioned_spec_);
+
+ std::vector<ManifestEntry> global_entries;
+ global_entries.push_back(
+ MakeDeleteEntry(/*snapshot_id=*/1001L, /*sequence_number=*/2,
global_eq_delete));
+
+ auto global_manifest = WriteDeleteManifest(
+ version, /*snapshot_id=*/1001L, std::move(global_entries),
unpartitioned_spec_);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({partition_manifest,
global_manifest}));
+
+ auto referenced = index->ReferencedDeleteFiles();
+ EXPECT_EQ(referenced.size(), 3);
+ EXPECT_THAT(GetPaths(referenced),
+ testing::UnorderedElementsAre("/path/to/eq-delete.parquet",
+ "/path/to/pos-delete.parquet",
+
"/path/to/global-eq-delete.parquet"));
+}
+
+TEST_P(DeleteFileIndexTest, TestExistingDeleteFiles) {
+ int version = GetParam();
+
+ auto partition_a = PartitionValues({Literal::Int(0)});
+ auto eq_delete = MakeEqualityDeleteFile("/path/to/eq-delete.parquet",
partition_a,
+ partitioned_spec_->spec_id());
+ auto pos_delete = MakePositionDeleteFile("/path/to/pos-delete.parquet",
partition_a,
+ partitioned_spec_->spec_id());
+
+ std::vector<ManifestEntry> entries;
+ // Use ManifestStatus::kExisting to simulate files that were merged from a
previous
+ // manifest
+ entries.push_back(MakeDeleteEntry(/*snapshot_id=*/1000L,
/*sequence_number=*/1,
+ eq_delete, ManifestStatus::kExisting));
+ entries.push_back(MakeDeleteEntry(/*snapshot_id=*/1000L,
/*sequence_number=*/1,
+ pos_delete, ManifestStatus::kExisting));
+
+ auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L,
std::move(entries),
+ partitioned_spec_);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest}));
+
+ EXPECT_TRUE(index->has_equality_deletes());
+ EXPECT_TRUE(index->has_position_deletes());
+
+ // Both delete files should be correctly loaded and applied to file_a_
+ ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *file_a_));
+ EXPECT_EQ(deletes.size(), 2);
+ EXPECT_THAT(GetPaths(deletes),
+ testing::UnorderedElementsAre("/path/to/eq-delete.parquet",
+ "/path/to/pos-delete.parquet"));
+}
+
+TEST_P(DeleteFileIndexTest, TestDeletedStatusExcluded) {
+ int version = GetParam();
+
+ auto partition_a = PartitionValues({Literal::Int(0)});
+ auto eq_delete_added = MakeEqualityDeleteFile(
+ "/path/to/eq-delete-added.parquet", partition_a,
partitioned_spec_->spec_id());
+ auto eq_delete_deleted = MakeEqualityDeleteFile(
+ "/path/to/eq-delete-deleted.parquet", partition_a,
partitioned_spec_->spec_id());
+ auto pos_delete_added = MakePositionDeleteFile(
+ "/path/to/pos-delete-added.parquet", partition_a,
partitioned_spec_->spec_id());
+ auto pos_delete_deleted = MakePositionDeleteFile(
+ "/path/to/pos-delete-deleted.parquet", partition_a,
partitioned_spec_->spec_id());
+
+ std::vector<ManifestEntry> entries;
+ entries.push_back(MakeDeleteEntry(/*snapshot_id=*/1000L,
/*sequence_number=*/1,
+ eq_delete_added, ManifestStatus::kAdded));
+ entries.push_back(MakeDeleteEntry(/*snapshot_id=*/1000L,
/*sequence_number=*/1,
+ eq_delete_deleted,
ManifestStatus::kDeleted));
+ entries.push_back(MakeDeleteEntry(/*snapshot_id=*/1000L,
/*sequence_number=*/1,
+ pos_delete_added, ManifestStatus::kAdded));
+ entries.push_back(MakeDeleteEntry(/*snapshot_id=*/1000L,
/*sequence_number=*/1,
+ pos_delete_deleted,
ManifestStatus::kDeleted));
+
+ auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L,
std::move(entries),
+ partitioned_spec_);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest}));
+
+ EXPECT_TRUE(index->has_equality_deletes());
+ EXPECT_TRUE(index->has_position_deletes());
+
+ // Only the non-deleted (ADDED) delete files should be loaded
+ ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *file_a_));
+ EXPECT_EQ(deletes.size(), 2);
+ EXPECT_THAT(GetPaths(deletes),
+ testing::UnorderedElementsAre("/path/to/eq-delete-added.parquet",
+
"/path/to/pos-delete-added.parquet"));
+}
+
+TEST_P(DeleteFileIndexTest, TestPositionDeleteDiscardMetrics) {
+ int version = GetParam();
+
+ auto partition_a = PartitionValues({Literal::Int(0)});
+
+ constexpr int32_t kDeleteFilePathFieldId =
MetadataColumns::kDeleteFilePathColumnId;
+ constexpr int32_t kPositionFieldId = MetadataColumns::kFilePositionColumnId;
+
+ // Create a position delete file with full metrics
+ auto pos_delete = std::make_shared<DataFile>(DataFile{
+ .content = DataFile::Content::kPositionDeletes,
+ .file_path = "/path/to/pos-delete-with-metrics.parquet",
+ .file_format = FileFormatType::kParquet,
+ .partition = partition_a,
+ .record_count = 100,
+ .file_size_in_bytes = 1024,
+ // Add stats for multiple columns
+ .column_sizes = {{kDeleteFilePathFieldId, 100}, {kPositionFieldId, 200}},
+ .value_counts = {{kDeleteFilePathFieldId, 10}, {kPositionFieldId, 20}},
+ .null_value_counts = {{kDeleteFilePathFieldId, 1}, {kPositionFieldId,
2}},
+ .nan_value_counts = {{kDeleteFilePathFieldId, 0}, {kPositionFieldId, 0}},
+ .lower_bounds = {{kDeleteFilePathFieldId, {0x01}}, {kPositionFieldId,
{0x02}}},
+ .upper_bounds = {{kDeleteFilePathFieldId, {0xFF}}, {kPositionFieldId,
{0xFE}}},
+ .partition_spec_id = partitioned_spec_->spec_id(),
+ });
+
+ std::vector<ManifestEntry> entries;
+ entries.push_back(
+ MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1,
pos_delete));
+
+ auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L,
std::move(entries),
+ partitioned_spec_);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest}));
+
+ EXPECT_TRUE(index->has_position_deletes());
+
+ // Get the delete files from the index
+ ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *file_a_));
+ ASSERT_EQ(deletes.size(), 1);
+
+ const auto& returned_file = deletes[0];
+ EXPECT_EQ(returned_file->file_path,
"/path/to/pos-delete-with-metrics.parquet");
+ // record_count should be preserved
+ EXPECT_EQ(returned_file->record_count, 100);
+ // Stats maps should only contain entries for delete file path.
+ EXPECT_EQ(returned_file->column_sizes.size(), 1);
+ EXPECT_EQ(returned_file->value_counts.size(), 1);
+ EXPECT_EQ(returned_file->null_value_counts.size(), 1);
+ EXPECT_EQ(returned_file->nan_value_counts.size(), 1);
+ EXPECT_EQ(returned_file->lower_bounds.size(), 1);
+ EXPECT_EQ(returned_file->upper_bounds.size(), 1);
+ EXPECT_TRUE(returned_file->column_sizes.contains(kDeleteFilePathFieldId));
+ EXPECT_TRUE(returned_file->value_counts.contains(kDeleteFilePathFieldId));
+
EXPECT_TRUE(returned_file->null_value_counts.contains(kDeleteFilePathFieldId));
+
EXPECT_TRUE(returned_file->nan_value_counts.contains(kDeleteFilePathFieldId));
+ EXPECT_TRUE(returned_file->lower_bounds.contains(kDeleteFilePathFieldId));
+ EXPECT_TRUE(returned_file->upper_bounds.contains(kDeleteFilePathFieldId));
+}
+
+TEST_P(DeleteFileIndexTest, TestEqualityDeleteDiscardMetrics) {
+ int version = GetParam();
+
+ auto partition_a = PartitionValues({Literal::Int(0)});
+
+ // Create an equality delete file with full metrics
+ auto eq_delete = std::make_shared<DataFile>(DataFile{
+ .content = DataFile::Content::kEqualityDeletes,
+ .file_path = "/path/to/eq-delete-with-metrics.parquet",
+ .file_format = FileFormatType::kParquet,
+ .partition = partition_a,
+ .record_count = 50,
+ .file_size_in_bytes = 512,
+ // Add stats for multiple columns
+ .column_sizes = {{1, 100}, {2, 200}, {3, 300}},
+ .value_counts = {{1, 10}, {2, 20}, {3, 30}},
+ .null_value_counts = {{1, 1}, {2, 2}, {3, 3}},
+ .nan_value_counts = {{1, 0}, {2, 0}, {3, 0}},
+ .lower_bounds = {{1, {0x01}}, {2, {0x02}}, {3, {0x03}}},
+ .upper_bounds = {{1, {0xFF}}, {2, {0xFE}}, {3, {0xFD}}},
+ .equality_ids = {1}, // equality field IDs
+ .partition_spec_id = partitioned_spec_->spec_id(),
+ });
+
+ std::vector<ManifestEntry> entries;
+ entries.push_back(
+ MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1,
eq_delete));
+
+ auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L,
std::move(entries),
+ partitioned_spec_);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest}));
+
+ EXPECT_TRUE(index->has_equality_deletes());
+
+ // Get the delete files from the index
+ ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *file_a_));
+ ASSERT_EQ(deletes.size(), 1);
+
+ const auto& returned_file = deletes[0];
+ EXPECT_EQ(returned_file->file_path,
"/path/to/eq-delete-with-metrics.parquet");
+ // record_count should be preserved
+ EXPECT_EQ(returned_file->record_count, 50);
+ // Stats maps should only contain entries for equality field IDs.
+ EXPECT_EQ(returned_file->column_sizes.size(), 1);
+ EXPECT_EQ(returned_file->value_counts.size(), 1);
+ EXPECT_EQ(returned_file->null_value_counts.size(), 1);
+ EXPECT_EQ(returned_file->nan_value_counts.size(), 1);
+ EXPECT_EQ(returned_file->lower_bounds.size(), 1);
+ EXPECT_EQ(returned_file->upper_bounds.size(), 1);
+ EXPECT_TRUE(returned_file->column_sizes.contains(1));
+ EXPECT_TRUE(returned_file->value_counts.contains(1));
+ EXPECT_TRUE(returned_file->null_value_counts.contains(1));
+ EXPECT_TRUE(returned_file->nan_value_counts.contains(1));
+ EXPECT_TRUE(returned_file->lower_bounds.contains(1));
+ EXPECT_TRUE(returned_file->upper_bounds.contains(1));
+}
+
+INSTANTIATE_TEST_SUITE_P(DeleteFileIndexVersions, DeleteFileIndexTest,
+ testing::Values(2, 3));
+
+} // namespace iceberg
diff --git a/src/iceberg/test/manifest_reader_test.cc
b/src/iceberg/test/manifest_reader_test.cc
index ab4798db..e8a1a511 100644
--- a/src/iceberg/test/manifest_reader_test.cc
+++ b/src/iceberg/test/manifest_reader_test.cc
@@ -375,6 +375,71 @@ TEST_P(TestManifestReader, TestInvalidUsage) {
EXPECT_THAT(reader_result, HasErrorMessage("has no snapshot ID"));
}
+TEST_P(TestManifestReader, TestDropStats) {
+ int version = GetParam();
+
+ // Create a data file with full metrics
+ auto file_with_stats = std::make_unique<DataFile>(DataFile{
+ .file_path = "/path/to/data-with-stats.parquet",
+ .file_format = FileFormatType::kParquet,
+ .partition = PartitionValues({Literal::Int(0)}),
+ .record_count = 100,
+ .file_size_in_bytes = 1024,
+ // Add stats for multiple columns
+ .column_sizes = {{1, 100}, {2, 200}, {3, 300}},
+ .value_counts = {{1, 10}, {2, 20}, {3, 30}},
+ .null_value_counts = {{1, 1}, {2, 2}, {3, 3}},
+ .nan_value_counts = {{1, 0}, {2, 0}, {3, 0}},
+ .lower_bounds = {{1, {0x01}}, {2, {0x02}}, {3, {0x03}}},
+ .upper_bounds = {{1, {0xFF}}, {2, {0xFE}}, {3, {0xFD}}},
+ .sort_order_id = 0,
+ });
+
+ auto entry = MakeEntry(ManifestStatus::kAdded, /*snapshot_id=*/1000L,
+ std::move(file_with_stats));
+
+ std::vector<ManifestEntry> entries;
+ entries.push_back(std::move(entry));
+
+ auto manifest = WriteManifest(version, /*snapshot_id=*/1000L,
std::move(entries));
+
+ auto reader_result = ManifestReader::Make(manifest, file_io_, schema_,
spec_);
+ ASSERT_THAT(reader_result, IsOk());
+ auto reader = std::move(reader_result.value());
+ reader->Select({"record_count"}).TryDropStats();
+
+ ICEBERG_UNWRAP_OR_FAIL(auto read_entries, reader->Entries());
+ ASSERT_EQ(read_entries.size(), 1);
+ const auto& read_entry = read_entries[0];
+
+ // record_count should be preserved
+ EXPECT_EQ(read_entry.data_file->record_count, 100);
+
+ // Stats maps should be cleared
+ EXPECT_TRUE(read_entry.data_file->column_sizes.empty());
+ EXPECT_TRUE(read_entry.data_file->value_counts.empty());
+ EXPECT_TRUE(read_entry.data_file->null_value_counts.empty());
+ EXPECT_TRUE(read_entry.data_file->nan_value_counts.empty());
+ EXPECT_TRUE(read_entry.data_file->lower_bounds.empty());
+ EXPECT_TRUE(read_entry.data_file->upper_bounds.empty());
+}
+
+TEST(ManifestReaderStaticTest, TestShouldDropStats) {
+ EXPECT_FALSE(ManifestReader::ShouldDropStats({}));
+
EXPECT_FALSE(ManifestReader::ShouldDropStats({std::string(Schema::kAllColumns)}));
+ EXPECT_TRUE(ManifestReader::ShouldDropStats({"file_path", "file_format",
"partition"}));
+ EXPECT_TRUE(
+ ManifestReader::ShouldDropStats({"file_path", "file_format",
"record_count"}));
+ EXPECT_FALSE(
+ ManifestReader::ShouldDropStats({"file_path", "file_format",
"value_counts"}));
+ EXPECT_FALSE(
+ ManifestReader::ShouldDropStats({"file_path", "file_format",
"lower_bounds"}));
+ EXPECT_FALSE(ManifestReader::ShouldDropStats(
+ {"file_path", "value_counts", "null_value_counts", "lower_bounds"}));
+ EXPECT_FALSE(
+ ManifestReader::ShouldDropStats({"file_path", "record_count",
"value_counts"}));
+}
+
INSTANTIATE_TEST_SUITE_P(ManifestReaderVersions, TestManifestReader,
testing::Values(1, 2, 3));
diff --git a/src/iceberg/util/content_file_util.cc
b/src/iceberg/util/content_file_util.cc
new file mode 100644
index 00000000..89c87508
--- /dev/null
+++ b/src/iceberg/util/content_file_util.cc
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/util/content_file_util.h"
+
+#include <format>
+
+#include "iceberg/file_format.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/metadata_columns.h"
+#include "iceberg/util/conversions.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+bool ContentFileUtil::IsDV(const DataFile& file) {
+ return file.file_format == FileFormatType::kPuffin;
+}
+
+Result<std::optional<std::string>> ContentFileUtil::ReferencedDataFile(
+ const DataFile& file) {
+ // Equality deletes don't reference a specific data file
+ if (file.content == DataFile::Content::kEqualityDeletes) {
+ return std::nullopt;
+ }
+
+ // If referenced_data_file is set, return it
+ if (file.referenced_data_file.has_value()) {
+ return file.referenced_data_file;
+ }
+
+ // Try to derive from lower/upper bounds on file_path column
+ auto lower_it =
file.lower_bounds.find(MetadataColumns::kDeleteFilePathColumnId);
+ if (lower_it == file.lower_bounds.end() || lower_it->second.empty()) {
+ return std::nullopt;
+ }
+
+ auto upper_it =
file.upper_bounds.find(MetadataColumns::kDeleteFilePathColumnId);
+ if (upper_it == file.upper_bounds.end() || upper_it->second.empty()) {
+ return std::nullopt;
+ }
+
+ // Check if lower and upper bounds are equal
+ if (lower_it->second == upper_it->second) {
+ // Convert the binary bound to a string
+ ICEBERG_ASSIGN_OR_RAISE(auto string_literal,
+ Conversions::FromBytes(*string(),
lower_it->second));
+ return std::get<std::string>(string_literal);
+ }
+
+ return std::nullopt;
+}
+
+Result<bool> ContentFileUtil::IsFileScoped(const DataFile& file) {
+ ICEBERG_ASSIGN_OR_RAISE(auto referenced_data_file, ReferencedDataFile(file));
+ return referenced_data_file.has_value();
+}
+
+bool ContentFileUtil::ContainsSingleDV(std::span<const
std::shared_ptr<DataFile>> files) {
+ return files.size() == 1 && IsDV(*files[0]);
+}
+
+std::string ContentFileUtil::DVDesc(const DataFile& file) {
+ return std::format("DV{{location={}, offset={}, length={},
referencedDataFile={}}}",
+ file.file_path, file.content_offset.value_or(-1),
+ file.content_size_in_bytes.value_or(-1),
+ file.referenced_data_file.value_or(""));
+}
+
+void ContentFileUtil::DropAllStats(DataFile& data_file) {
+ data_file.column_sizes.clear();
+ data_file.value_counts.clear();
+ data_file.null_value_counts.clear();
+ data_file.nan_value_counts.clear();
+ data_file.lower_bounds.clear();
+ data_file.upper_bounds.clear();
+}
+
+namespace {
+
+template <typename V>
+void DropUnselectedColumnStats(std::map<int32_t, V>& map,
+ const std::unordered_set<int32_t>& columns) {
+ for (auto it = map.begin(); it != map.end();) {
+ if (columns.find(it->first) == columns.end()) {
+ it = map.erase(it);
+ } else {
+ ++it;
+ }
+ }
+}
+
+} // namespace
+
+void ContentFileUtil::DropUnselectedStats(
+ DataFile& data_file, const std::unordered_set<int32_t>& selected_columns) {
+ DropUnselectedColumnStats<int64_t>(data_file.column_sizes, selected_columns);
+ DropUnselectedColumnStats<int64_t>(data_file.value_counts, selected_columns);
+ DropUnselectedColumnStats<int64_t>(data_file.null_value_counts,
selected_columns);
+ DropUnselectedColumnStats<int64_t>(data_file.nan_value_counts,
selected_columns);
+ DropUnselectedColumnStats<std::vector<uint8_t>>(data_file.lower_bounds,
+ selected_columns);
+ DropUnselectedColumnStats<std::vector<uint8_t>>(data_file.upper_bounds,
+ selected_columns);
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/util/content_file_util.h
b/src/iceberg/util/content_file_util.h
new file mode 100644
index 00000000..e173a41a
--- /dev/null
+++ b/src/iceberg/util/content_file_util.h
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/util/content_file_util.h
+/// Utility functions for content files (data files and delete files).
+
+#include <memory>
+#include <optional>
+#include <span>
+#include <string>
+#include <unordered_set>
+
+#include "iceberg/iceberg_export.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+
+namespace iceberg {
+
+/// \brief Utility functions for content files.
+struct ICEBERG_EXPORT ContentFileUtil {
+ /// \brief Check if a delete file is a deletion vector (DV).
+ static bool IsDV(const DataFile& file);
+
+ /// \brief Get the referenced data file path from a position delete file.
+ static Result<std::optional<std::string>> ReferencedDataFile(const DataFile&
file);
+
+ /// \brief Check if a delete file is file-scoped.
+ static Result<bool> IsFileScoped(const DataFile& file);
+
+ /// \brief Check if a collection of delete files contains exactly one DV.
+ static bool ContainsSingleDV(std::span<const std::shared_ptr<DataFile>>
files);
+
+ /// \brief Generate a description string for a deletion vector.
+ static std::string DVDesc(const DataFile& file);
+
+ /// \brief In-place drop stats.
+ static void DropAllStats(DataFile& data_file);
+
+ /// \brief Preserve stats based on selected columns.
+ static void DropUnselectedStats(DataFile& data_file,
+ const std::unordered_set<int32_t>&
selected_columns);
+};
+
+} // namespace iceberg
diff --git a/src/iceberg/util/meson.build b/src/iceberg/util/meson.build
index 9f327753..48477925 100644
--- a/src/iceberg/util/meson.build
+++ b/src/iceberg/util/meson.build
@@ -20,6 +20,7 @@ install_headers(
'bucket_util.h',
'checked_cast.h',
'config.h',
+ 'content_file_util.h',
'conversions.h',
'decimal.h',
'endian.h',