This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new b9ce88f9 feat: implement manifest group (#455)
b9ce88f9 is described below
commit b9ce88f91c76f051340e035f0595eaeb3726e694
Author: Gang Wu <[email protected]>
AuthorDate: Wed Dec 31 14:22:53 2025 +0800
feat: implement manifest group (#455)
---
src/iceberg/CMakeLists.txt | 1 +
src/iceberg/constants.h | 12 +
src/iceberg/delete_file_index.cc | 36 +-
src/iceberg/delete_file_index.h | 25 +-
src/iceberg/manifest/manifest_adapter.cc | 91 +++--
src/iceberg/manifest/manifest_entry.h | 25 +-
src/iceberg/manifest/manifest_group.cc | 367 ++++++++++++++++++++
src/iceberg/manifest/manifest_group.h | 167 +++++++++
src/iceberg/manifest/manifest_writer.cc | 12 +-
src/iceberg/manifest/meson.build | 1 +
src/iceberg/manifest/v2_metadata.cc | 5 +-
src/iceberg/manifest/v3_metadata.cc | 5 +-
src/iceberg/meson.build | 1 +
src/iceberg/table_scan.cc | 28 +-
src/iceberg/table_scan.h | 25 +-
src/iceberg/test/CMakeLists.txt | 1 +
src/iceberg/test/delete_file_index_test.cc | 6 +-
src/iceberg/test/manifest_group_test.cc | 521 +++++++++++++++++++++++++++++
src/iceberg/type_fwd.h | 8 +
19 files changed, 1234 insertions(+), 103 deletions(-)
diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index bf24ae32..bc7182ae 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -42,6 +42,7 @@ set(ICEBERG_SOURCES
json_internal.cc
manifest/manifest_adapter.cc
manifest/manifest_entry.cc
+ manifest/manifest_group.cc
manifest/manifest_list.cc
manifest/manifest_reader.cc
manifest/manifest_writer.cc
diff --git a/src/iceberg/constants.h b/src/iceberg/constants.h
index 759ae3cc..89001f09 100644
--- a/src/iceberg/constants.h
+++ b/src/iceberg/constants.h
@@ -19,6 +19,12 @@
#pragma once
+/// \file iceberg/constants.h
+/// This file defines constants used commonly and shared across multiple
+/// source files. It is mostly useful to add constants that are used as
+/// default values in the class definitions in the header files without
+/// including other headers just for the constant definitions.
+
#include <cstdint>
#include <string_view>
@@ -26,5 +32,11 @@ namespace iceberg {
constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id";
constexpr int64_t kInvalidSnapshotId = -1;
+/// \brief Stand-in for the current sequence number that will be assigned when
the commit
+/// is successful. This is replaced when writing a manifest list by the
ManifestFile
+/// adapter.
+constexpr int64_t kUnassignedSequenceNumber = -1;
+
+// TODO(gangwu): move other commonly used constants here.
} // namespace iceberg
diff --git a/src/iceberg/delete_file_index.cc b/src/iceberg/delete_file_index.cc
index 2a96e9d3..7c1a12f0 100644
--- a/src/iceberg/delete_file_index.cc
+++ b/src/iceberg/delete_file_index.cc
@@ -453,34 +453,32 @@ Result<std::shared_ptr<DataFile>> DeleteFileIndex::FindDV(
}
Result<DeleteFileIndex::Builder> DeleteFileIndex::BuilderFor(
- std::shared_ptr<FileIO> io, std::vector<ManifestFile> delete_manifests) {
+ std::shared_ptr<FileIO> io, std::shared_ptr<Schema> schema,
+ std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id,
+ std::vector<ManifestFile> delete_manifests) {
ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
- return Builder(std::move(io), std::move(delete_manifests));
+ ICEBERG_PRECHECK(schema != nullptr, "Schema cannot be null");
+ ICEBERG_PRECHECK(!specs_by_id.empty(), "Partition specs cannot be empty");
+ return Builder(std::move(io), std::move(schema), std::move(specs_by_id),
+ std::move(delete_manifests));
}
// Builder implementation
-DeleteFileIndex::Builder::Builder(std::shared_ptr<FileIO> io,
- std::vector<ManifestFile> delete_manifests)
- : io_(std::move(io)), delete_manifests_(std::move(delete_manifests)) {}
+DeleteFileIndex::Builder::Builder(
+ std::shared_ptr<FileIO> io, std::shared_ptr<Schema> schema,
+ std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id,
+ std::vector<ManifestFile> delete_manifests)
+ : io_(std::move(io)),
+ schema_(std::move(schema)),
+ specs_by_id_(std::move(specs_by_id)),
+ delete_manifests_(std::move(delete_manifests)) {}
DeleteFileIndex::Builder::~Builder() = default;
DeleteFileIndex::Builder::Builder(Builder&&) noexcept = default;
DeleteFileIndex::Builder& DeleteFileIndex::Builder::operator=(Builder&&)
noexcept =
default;
-DeleteFileIndex::Builder& DeleteFileIndex::Builder::SpecsById(
- std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id) {
- specs_by_id_ = std::move(specs_by_id);
- return *this;
-}
-
-DeleteFileIndex::Builder& DeleteFileIndex::Builder::WithSchema(
- std::shared_ptr<Schema> schema) {
- schema_ = std::move(schema);
- return *this;
-}
-
DeleteFileIndex::Builder&
DeleteFileIndex::Builder::AfterSequenceNumber(int64_t seq) {
min_sequence_number_ = seq;
return *this;
@@ -721,10 +719,6 @@ Status DeleteFileIndex::Builder::AddEqualityDelete(
Result<std::unique_ptr<DeleteFileIndex>> DeleteFileIndex::Builder::Build() {
ICEBERG_RETURN_UNEXPECTED(CheckErrors());
- ICEBERG_PRECHECK(io_ != nullptr, "FileIO is required to load delete files");
- ICEBERG_PRECHECK(schema_ != nullptr, "Schema is required to load delete
files");
- ICEBERG_PRECHECK(!specs_by_id_.empty(),
- "Partition specs are required to load delete files");
std::vector<ManifestEntry> entries;
if (!delete_manifests_.empty()) {
diff --git a/src/iceberg/delete_file_index.h b/src/iceberg/delete_file_index.h
index 03cc2661..5444281a 100644
--- a/src/iceberg/delete_file_index.h
+++ b/src/iceberg/delete_file_index.h
@@ -268,10 +268,14 @@ class ICEBERG_EXPORT DeleteFileIndex {
/// \brief Create a builder for constructing a DeleteFileIndex from manifest
files.
///
/// \param io The FileIO to use for reading manifests
+ /// \param schema Current table schema
+ /// \param specs_by_id Partition specs by their IDs
/// \param delete_manifests The delete manifests to index
/// \return A Builder instance
- static Result<Builder> BuilderFor(std::shared_ptr<FileIO> io,
- std::vector<ManifestFile>
delete_manifests);
+ static Result<Builder> BuilderFor(
+ std::shared_ptr<FileIO> io, std::shared_ptr<Schema> schema,
+ std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id,
+ std::vector<ManifestFile> delete_manifests);
private:
friend class Builder;
@@ -318,7 +322,9 @@ class ICEBERG_EXPORT DeleteFileIndex {
class ICEBERG_EXPORT DeleteFileIndex::Builder : public ErrorCollector {
public:
/// \brief Construct a builder from manifest files.
- Builder(std::shared_ptr<FileIO> io, std::vector<ManifestFile>
delete_manifests);
+ Builder(std::shared_ptr<FileIO> io, std::shared_ptr<Schema> schema,
+ std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>
specs_by_id,
+ std::vector<ManifestFile> delete_manifests);
~Builder() override;
@@ -327,15 +333,6 @@ class ICEBERG_EXPORT DeleteFileIndex::Builder : public
ErrorCollector {
Builder(const Builder&) = delete;
Builder& operator=(const Builder&) = delete;
- /// \brief Set the partition specs by ID.
- Builder& SpecsById(
- std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id);
-
- /// \brief Set the table schema.
- ///
- /// Required for filtering and expression evaluation.
- Builder& WithSchema(std::shared_ptr<Schema> schema);
-
/// \brief Set the minimum sequence number for delete files.
///
/// Only delete files with sequence number > min_sequence_number will be
included.
@@ -384,10 +381,10 @@ class ICEBERG_EXPORT DeleteFileIndex::Builder : public
ErrorCollector {
ManifestEntry&& entry);
std::shared_ptr<FileIO> io_;
+ std::shared_ptr<Schema> schema_;
+ std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id_;
std::vector<ManifestFile> delete_manifests_;
int64_t min_sequence_number_ = 0;
- std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id_;
- std::shared_ptr<Schema> schema_;
std::shared_ptr<Expression> data_filter_;
std::shared_ptr<Expression> partition_filter_;
std::shared_ptr<PartitionSet> partition_set_;
diff --git a/src/iceberg/manifest/manifest_adapter.cc
b/src/iceberg/manifest/manifest_adapter.cc
index da75a468..cf0a0515 100644
--- a/src/iceberg/manifest/manifest_adapter.cc
+++ b/src/iceberg/manifest/manifest_adapter.cc
@@ -36,6 +36,7 @@ namespace iceberg {
namespace {
constexpr int64_t kBlockSizeInBytesV1 = 64 * 1024 * 1024L;
+constexpr int32_t kBlockSizeInBytesFieldId = 105;
Status AppendField(ArrowArray* array, int64_t value) {
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendInt(array, value));
@@ -254,64 +255,63 @@ Status ManifestEntryAdapter::AppendDataFile(
auto child_array = array->children[i];
switch (field.field_id()) {
- case 134: // content (optional int32)
+ case DataFile::kContentFieldId: // optional int32
ICEBERG_RETURN_UNEXPECTED(
AppendField(child_array, static_cast<int64_t>(file.content)));
break;
- case 100: // file_path (required string)
+ case DataFile::kFilePathFieldId: // required string
ICEBERG_RETURN_UNEXPECTED(AppendField(child_array, file.file_path));
break;
- case 101: // file_format (required string)
+ case DataFile::kFileFormatFieldId: // required string
ICEBERG_RETURN_UNEXPECTED(AppendField(child_array,
ToString(file.file_format)));
break;
- case 102: {
- // partition (required struct)
+ case DataFile::kPartitionFieldId: { // required struct
auto partition_type =
internal::checked_pointer_cast<StructType>(field.type());
ICEBERG_RETURN_UNEXPECTED(
AppendPartitionValues(child_array, partition_type,
file.partition));
} break;
- case 103: // record_count (required int64)
+ case DataFile::kRecordCountFieldId: // required int64
ICEBERG_RETURN_UNEXPECTED(AppendField(child_array, file.record_count));
break;
- case 104: // file_size_in_bytes (required int64)
+ case DataFile::kFileSizeFieldId: // required int64
ICEBERG_RETURN_UNEXPECTED(AppendField(child_array,
file.file_size_in_bytes));
break;
- case 105: // block_size_in_bytes (compatible in v1)
+ case kBlockSizeInBytesFieldId: // compatible with v1
// always 64MB for v1
ICEBERG_RETURN_UNEXPECTED(AppendField(child_array,
kBlockSizeInBytesV1));
break;
- case 108: // column_sizes (optional map)
+ case DataFile::kColumnSizesFieldId: // optional map
ICEBERG_RETURN_UNEXPECTED(AppendMap(child_array, file.column_sizes));
break;
- case 109: // value_counts (optional map)
+ case DataFile::kValueCountsFieldId: // optional map
ICEBERG_RETURN_UNEXPECTED(AppendMap(child_array, file.value_counts));
break;
- case 110: // null_value_counts (optional map)
+ case DataFile::kNullValueCountsFieldId: // optional map
ICEBERG_RETURN_UNEXPECTED(AppendMap(child_array,
file.null_value_counts));
break;
- case 137: // nan_value_counts (optional map)
+ case DataFile::kNanValueCountsFieldId: // optional map
ICEBERG_RETURN_UNEXPECTED(AppendMap(child_array,
file.nan_value_counts));
break;
- case 125: // lower_bounds (optional map)
+ case DataFile::kLowerBoundsFieldId: // optional map
ICEBERG_RETURN_UNEXPECTED(AppendMap(child_array, file.lower_bounds));
break;
- case 128: // upper_bounds (optional map)
+ case DataFile::kUpperBoundsFieldId: // optional map
ICEBERG_RETURN_UNEXPECTED(AppendMap(child_array, file.upper_bounds));
break;
- case 131: // key_metadata (optional binary)
+ case DataFile::kKeyMetadataFieldId: // optional binary
if (!file.key_metadata.empty()) {
ICEBERG_RETURN_UNEXPECTED(AppendField(child_array,
file.key_metadata));
} else {
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(child_array, 1));
}
break;
- case 132: // split_offsets (optional list)
+ case DataFile::kSplitOffsetsFieldId: // optional list
ICEBERG_RETURN_UNEXPECTED(AppendList(child_array, file.split_offsets));
break;
- case 135: // equality_ids (optional list)
+ case DataFile::kEqualityIdsFieldId: // optional list
ICEBERG_RETURN_UNEXPECTED(AppendList(child_array, file.equality_ids));
break;
- case 140: // sort_order_id (optional int32)
+ case DataFile::kSortOrderIdFieldId: // optional int32
if (file.sort_order_id.has_value()) {
ICEBERG_RETURN_UNEXPECTED(
AppendField(child_array,
static_cast<int64_t>(file.sort_order_id.value())));
@@ -319,15 +319,14 @@ Status ManifestEntryAdapter::AppendDataFile(
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(child_array, 1));
}
break;
- case 142: // first_row_id (optional int64)
+ case DataFile::kFirstRowIdFieldId: // optional int64
if (file.first_row_id.has_value()) {
ICEBERG_RETURN_UNEXPECTED(AppendField(child_array,
file.first_row_id.value()));
} else {
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(child_array, 1));
}
break;
- case 143: {
- // referenced_data_file (optional string)
+ case DataFile::kReferencedDataFileFieldId: { // optional string
ICEBERG_ASSIGN_OR_RAISE(auto referenced_data_file,
GetReferenceDataFile(file));
if (referenced_data_file.has_value()) {
ICEBERG_RETURN_UNEXPECTED(
@@ -337,7 +336,7 @@ Status ManifestEntryAdapter::AppendDataFile(
}
break;
}
- case 144: // content_offset (optional int64)
+ case DataFile::kContentOffsetFieldId: // optional int64
if (file.content_offset.has_value()) {
ICEBERG_RETURN_UNEXPECTED(
AppendField(child_array, file.content_offset.value()));
@@ -345,7 +344,7 @@ Status ManifestEntryAdapter::AppendDataFile(
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(child_array, 1));
}
break;
- case 145: // content_size_in_bytes (optional int64)
+ case DataFile::kContentSizeFieldId: // optional int64
if (file.content_size_in_bytes.has_value()) {
ICEBERG_RETURN_UNEXPECTED(
AppendField(child_array, file.content_size_in_bytes.value()));
@@ -397,18 +396,18 @@ Status ManifestEntryAdapter::AppendInternal(const
ManifestEntry& entry) {
auto array = array_.children[i];
switch (field.field_id()) {
- case 0: // status (required int32)
+ case ManifestEntry::kStatusFieldId: // required int32
ICEBERG_RETURN_UNEXPECTED(
AppendField(array,
static_cast<int64_t>(static_cast<int32_t>(entry.status))));
break;
- case 1: // snapshot_id (optional int64)
+ case ManifestEntry::kSnapshotIdFieldId: // optional int64
if (entry.snapshot_id.has_value()) {
ICEBERG_RETURN_UNEXPECTED(AppendField(array,
entry.snapshot_id.value()));
} else {
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1));
}
break;
- case 2: // data_file (required struct)
+ case ManifestEntry::kDataFileFieldId: // required struct
if (entry.data_file) {
// Get the data file type from the field
auto data_file_type =
internal::checked_pointer_cast<StructType>(field.type());
@@ -418,8 +417,7 @@ Status ManifestEntryAdapter::AppendInternal(const
ManifestEntry& entry) {
return InvalidManifest("Missing required data_file field from
manifest entry.");
}
break;
- case 3: {
- // sequence_number (optional int64)
+ case ManifestEntry::kSequenceNumberFieldId: { // optional int64
ICEBERG_ASSIGN_OR_RAISE(auto sequence_num, GetSequenceNumber(entry));
if (sequence_num.has_value()) {
ICEBERG_RETURN_UNEXPECTED(AppendField(array, sequence_num.value()));
@@ -428,7 +426,7 @@ Status ManifestEntryAdapter::AppendInternal(const
ManifestEntry& entry) {
}
break;
}
- case 4: // file_sequence_number (optional int64)
+ case ManifestEntry::kFileSequenceNumberFieldId: // optional int64
if (entry.file_sequence_number.has_value()) {
ICEBERG_RETURN_UNEXPECTED(
AppendField(array, entry.file_sequence_number.value()));
@@ -538,36 +536,34 @@ Status ManifestFileAdapter::AppendInternal(const
ManifestFile& file) {
const auto& field = fields[i];
auto array = array_.children[i];
switch (field.field_id()) {
- case 500: // manifest_path
+ case ManifestFile::kManifestPathFieldId:
ICEBERG_RETURN_UNEXPECTED(AppendField(array, file.manifest_path));
break;
- case 501: // manifest_length
+ case ManifestFile::kManifestLengthFieldId:
ICEBERG_RETURN_UNEXPECTED(AppendField(array, file.manifest_length));
break;
- case 502: // partition_spec_id
+ case ManifestFile::kPartitionSpecIdFieldId:
ICEBERG_RETURN_UNEXPECTED(
AppendField(array, static_cast<int64_t>(file.partition_spec_id)));
break;
- case 517: // content
+ case ManifestFile::kContentFieldId:
ICEBERG_RETURN_UNEXPECTED(
AppendField(array,
static_cast<int64_t>(static_cast<int32_t>(file.content))));
break;
- case 515: {
- // sequence_number
+ case ManifestFile::kSequenceNumberFieldId: {
ICEBERG_ASSIGN_OR_RAISE(auto sequence_num, GetSequenceNumber(file));
ICEBERG_RETURN_UNEXPECTED(AppendField(array, sequence_num));
break;
}
- case 516: {
- // min_sequence_number
+ case ManifestFile::kMinSequenceNumberFieldId: {
ICEBERG_ASSIGN_OR_RAISE(auto min_sequence_num,
GetMinSequenceNumber(file));
ICEBERG_RETURN_UNEXPECTED(AppendField(array, min_sequence_num));
break;
}
- case 503: // added_snapshot_id
+ case ManifestFile::kAddedSnapshotIdFieldId:
ICEBERG_RETURN_UNEXPECTED(AppendField(array, file.added_snapshot_id));
break;
- case 504: // added_files_count
+ case ManifestFile::kAddedFilesCountFieldId:
if (file.added_files_count.has_value()) {
ICEBERG_RETURN_UNEXPECTED(
AppendField(array,
static_cast<int64_t>(file.added_files_count.value())));
@@ -576,7 +572,7 @@ Status ManifestFileAdapter::AppendInternal(const
ManifestFile& file) {
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1));
}
break;
- case 505: // existing_files_count
+ case ManifestFile::kExistingFilesCountFieldId:
if (file.existing_files_count.has_value()) {
ICEBERG_RETURN_UNEXPECTED(AppendField(
array, static_cast<int64_t>(file.existing_files_count.value())));
@@ -585,7 +581,7 @@ Status ManifestFileAdapter::AppendInternal(const
ManifestFile& file) {
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1));
}
break;
- case 506: // deleted_files_count
+ case ManifestFile::kDeletedFilesCountFieldId:
if (file.deleted_files_count.has_value()) {
ICEBERG_RETURN_UNEXPECTED(
AppendField(array,
static_cast<int64_t>(file.deleted_files_count.value())));
@@ -594,7 +590,7 @@ Status ManifestFileAdapter::AppendInternal(const
ManifestFile& file) {
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1));
}
break;
- case 512: // added_rows_count
+ case ManifestFile::kAddedRowsCountFieldId:
if (file.added_rows_count.has_value()) {
ICEBERG_RETURN_UNEXPECTED(AppendField(array,
file.added_rows_count.value()));
} else {
@@ -602,7 +598,7 @@ Status ManifestFileAdapter::AppendInternal(const
ManifestFile& file) {
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1));
}
break;
- case 513: // existing_rows_count
+ case ManifestFile::kExistingRowsCountFieldId:
if (file.existing_rows_count.has_value()) {
ICEBERG_RETURN_UNEXPECTED(AppendField(array,
file.existing_rows_count.value()));
} else {
@@ -610,7 +606,7 @@ Status ManifestFileAdapter::AppendInternal(const
ManifestFile& file) {
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1));
}
break;
- case 514: // deleted_rows_count
+ case ManifestFile::kDeletedRowsCountFieldId:
if (file.deleted_rows_count.has_value()) {
ICEBERG_RETURN_UNEXPECTED(AppendField(array,
file.deleted_rows_count.value()));
} else {
@@ -618,20 +614,19 @@ Status ManifestFileAdapter::AppendInternal(const
ManifestFile& file) {
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1));
}
break;
- case 507: // partitions
+ case ManifestFile::kPartitionSummaryFieldId:
ICEBERG_RETURN_UNEXPECTED(AppendPartitionSummary(
array, internal::checked_pointer_cast<ListType>(field.type()),
file.partitions));
break;
- case 519: // key_metadata
+ case ManifestFile::kKeyMetadataFieldId:
if (!file.key_metadata.empty()) {
ICEBERG_RETURN_UNEXPECTED(AppendField(array, file.key_metadata));
} else {
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1));
}
break;
- case 520: {
- // first_row_id
+ case ManifestFile::kFirstRowIdFieldId: {
ICEBERG_ASSIGN_OR_RAISE(auto first_row_id, GetFirstRowId(file));
if (first_row_id.has_value()) {
ICEBERG_RETURN_UNEXPECTED(AppendField(array, first_row_id.value()));
diff --git a/src/iceberg/manifest/manifest_entry.h
b/src/iceberg/manifest/manifest_entry.h
index 5d35530b..1e4af044 100644
--- a/src/iceberg/manifest/manifest_entry.h
+++ b/src/iceberg/manifest/manifest_entry.h
@@ -308,11 +308,30 @@ struct ICEBERG_EXPORT ManifestEntry {
/// null.
std::optional<int64_t> snapshot_id;
/// Field id: 3
- /// Data sequence number of the file. Inherited when null and status is 1
(added).
+ /// Data sequence number of the file.
+ ///
+ /// Independently of the entry status, it represents the sequence number to
which the
+ /// file should apply. Note the data sequence number may differ from the
sequence number
+ /// of the snapshot in which the underlying file was added. New snapshots
can add files
+ /// that belong to older sequence numbers (e.g. compaction). The data
sequence number
+ /// also does not change when the file is marked as deleted.
+ ///
+ /// \note It can return nullopt if the data sequence number is unknown. This
may happen
+ /// while reading a v2 manifest that did not persist the data sequence
number for
+ /// manifest entries with status DELETED (older Iceberg versions).
std::optional<int64_t> sequence_number;
/// Field id: 4
- /// File sequence number indicating when the file was added. Inherited when
null and
- /// status is 1 (added).
+ /// The file sequence number.
+ ///
+ /// The file sequence number represents the sequence number of the snapshot
in which the
+ /// underlying file was added. The file sequence number is always assigned
at commit and
+ /// cannot be provided explicitly, unlike the data sequence number. The file
sequence
+ /// number does not change upon assigning and must be preserved in existing
and deleted
+ /// entries.
+ ///
+ /// \note It can return nullopt if the file sequence number is unknown. This
may happen
+ /// while reading a v2 manifest that did not persist the file sequence
number for
+ /// manifest entries with status EXISTING or DELETED (older Iceberg
versions).
std::optional<int64_t> file_sequence_number;
/// Field id: 2
/// File path, partition tuple, metrics, ...
diff --git a/src/iceberg/manifest/manifest_group.cc
b/src/iceberg/manifest/manifest_group.cc
new file mode 100644
index 00000000..220b8585
--- /dev/null
+++ b/src/iceberg/manifest/manifest_group.cc
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest/manifest_group.h"
+
+#include <utility>
+
+#include "iceberg/expression/evaluator.h"
+#include "iceberg/expression/expression.h"
+#include "iceberg/expression/manifest_evaluator.h"
+#include "iceberg/expression/projections.h"
+#include "iceberg/expression/residual_evaluator.h"
+#include "iceberg/file_io.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/schema.h"
+#include "iceberg/table_scan.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/content_file_util.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+Result<std::unique_ptr<ManifestGroup>> ManifestGroup::Make(
+ std::shared_ptr<FileIO> io, std::shared_ptr<Schema> schema,
+ std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id,
+ std::vector<ManifestFile> manifests) {
+ std::vector<ManifestFile> data_manifests;
+ std::vector<ManifestFile> delete_manifests;
+ for (auto& manifest : manifests) {
+ if (manifest.content == ManifestContent::kData) {
+ data_manifests.push_back(std::move(manifest));
+ } else if (manifest.content == ManifestContent::kDeletes) {
+ delete_manifests.push_back(std::move(manifest));
+ }
+ }
+
+ return ManifestGroup::Make(std::move(io), std::move(schema),
std::move(specs_by_id),
+ std::move(data_manifests),
std::move(delete_manifests));
+}
+
+Result<std::unique_ptr<ManifestGroup>> ManifestGroup::Make(
+ std::shared_ptr<FileIO> io, std::shared_ptr<Schema> schema,
+ std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id,
+ std::vector<ManifestFile> data_manifests,
+ std::vector<ManifestFile> delete_manifests) {
+ // DeleteFileIndex::Builder validates all input parameters so we skip
validation here
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto delete_index_builder,
+ DeleteFileIndex::BuilderFor(io, schema, specs_by_id,
std::move(delete_manifests)));
+ return std::unique_ptr<ManifestGroup>(
+ new ManifestGroup(std::move(io), std::move(schema),
std::move(specs_by_id),
+ std::move(data_manifests),
std::move(delete_index_builder)));
+}
+
+ManifestGroup::ManifestGroup(
+ std::shared_ptr<FileIO> io, std::shared_ptr<Schema> schema,
+ std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id,
+ std::vector<ManifestFile> data_manifests,
+ DeleteFileIndex::Builder&& delete_index_builder)
+ : io_(std::move(io)),
+ schema_(std::move(schema)),
+ specs_by_id_(std::move(specs_by_id)),
+ data_manifests_(std::move(data_manifests)),
+ delete_index_builder_(std::move(delete_index_builder)),
+ data_filter_(True::Instance()),
+ file_filter_(True::Instance()),
+ partition_filter_(True::Instance()),
+ manifest_entry_predicate_([](const ManifestEntry&) { return true; }) {}
+
+ManifestGroup::~ManifestGroup() = default;
+
+ManifestGroup::ManifestGroup(ManifestGroup&&) noexcept = default;
+ManifestGroup& ManifestGroup::operator=(ManifestGroup&&) noexcept = default;
+
+ManifestGroup& ManifestGroup::FilterData(std::shared_ptr<Expression> filter) {
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(data_filter_, And::Make(data_filter_,
filter));
+ delete_index_builder_.DataFilter(std::move(filter));
+ return *this;
+}
+
+ManifestGroup& ManifestGroup::FilterFiles(std::shared_ptr<Expression> filter) {
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(file_filter_,
+ And::Make(file_filter_, std::move(filter)));
+ return *this;
+}
+
+ManifestGroup& ManifestGroup::FilterPartitions(std::shared_ptr<Expression>
filter) {
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(partition_filter_,
+ And::Make(partition_filter_, filter));
+ delete_index_builder_.PartitionFilter(std::move(filter));
+ return *this;
+}
+
+ManifestGroup& ManifestGroup::FilterManifestEntries(
+ std::function<bool(const ManifestEntry&)> predicate) {
+ manifest_entry_predicate_ = [old_predicate =
std::move(manifest_entry_predicate_),
+ predicate =
+ std::move(predicate)](const ManifestEntry&
entry) {
+ return old_predicate(entry) && predicate(entry);
+ };
+ return *this;
+}
+
+ManifestGroup& ManifestGroup::IgnoreDeleted() {
+ ignore_deleted_ = true;
+ return *this;
+}
+
+ManifestGroup& ManifestGroup::IgnoreExisting() {
+ ignore_existing_ = true;
+ return *this;
+}
+
+ManifestGroup& ManifestGroup::IgnoreResiduals() {
+ ignore_residuals_ = true;
+ delete_index_builder_.IgnoreResiduals();
+ return *this;
+}
+
+ManifestGroup& ManifestGroup::Select(std::vector<std::string> columns) {
+ columns_ = std::move(columns);
+ return *this;
+}
+
+ManifestGroup& ManifestGroup::CaseSensitive(bool case_sensitive) {
+ case_sensitive_ = case_sensitive;
+ delete_index_builder_.CaseSensitive(case_sensitive);
+ return *this;
+}
+
+ManifestGroup& ManifestGroup::ColumnsToKeepStats(std::unordered_set<int32_t>
column_ids) {
+ columns_to_keep_stats_ = std::move(column_ids);
+ return *this;
+}
+
+Result<std::vector<std::shared_ptr<FileScanTask>>> ManifestGroup::PlanFiles() {
+ auto create_file_scan_tasks =
+ [this](std::vector<ManifestEntry>&& entries,
+ const TaskContext& ctx) ->
Result<std::vector<std::shared_ptr<ScanTask>>> {
+ std::vector<std::shared_ptr<ScanTask>> tasks;
+ tasks.reserve(entries.size());
+
+ for (auto& entry : entries) {
+ if (ctx.drop_stats) {
+ ContentFileUtil::DropAllStats(*entry.data_file);
+ } else if (!ctx.columns_to_keep_stats.empty()) {
+ ContentFileUtil::DropUnselectedStats(*entry.data_file,
ctx.columns_to_keep_stats);
+ }
+ ICEBERG_ASSIGN_OR_RAISE(auto delete_files, ctx.deletes->ForEntry(entry));
+ ICEBERG_ASSIGN_OR_RAISE(auto residual,
+
ctx.residuals->ResidualFor(entry.data_file->partition));
+ tasks.push_back(std::make_shared<FileScanTask>(
+ std::move(entry.data_file), std::move(delete_files),
std::move(residual)));
+ }
+
+ return tasks;
+ };
+
+ ICEBERG_ASSIGN_OR_RAISE(auto tasks, Plan(create_file_scan_tasks));
+
+ // Convert ScanTask to FileScanTask
+ std::vector<std::shared_ptr<FileScanTask>> file_tasks;
+ file_tasks.reserve(tasks.size());
+ for (auto& task : tasks) {
+ file_tasks.push_back(internal::checked_pointer_cast<FileScanTask>(task));
+ }
+ return file_tasks;
+}
+
+Result<std::vector<std::shared_ptr<ScanTask>>> ManifestGroup::Plan(
+ const CreateTasksFunction& create_tasks) {
+ std::unordered_map<int32_t, std::shared_ptr<ResidualEvaluator>>
residual_cache;
+ auto get_residual_evaluator = [&](int32_t spec_id) ->
Result<ResidualEvaluator*> {
+ if (residual_cache.contains(spec_id)) {
+ return residual_cache[spec_id].get();
+ }
+
+ auto spec_iter = specs_by_id_.find(spec_id);
+ ICEBERG_CHECK(spec_iter != specs_by_id_.cend(),
+ "Cannot find partition spec for ID {}", spec_id);
+
+ const auto& spec = spec_iter->second;
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto residual_evaluator,
+ ResidualEvaluator::Make((ignore_residuals_ ? True::Instance() :
data_filter_),
+ *spec, *schema_, case_sensitive_));
+ residual_cache[spec_id] = std::move(residual_evaluator);
+
+ return residual_cache[spec_id].get();
+ };
+
+ ICEBERG_ASSIGN_OR_RAISE(auto delete_index, delete_index_builder_.Build());
+
+ bool drop_stats = ManifestReader::ShouldDropStats(columns_);
+ if (delete_index->has_equality_deletes()) {
+ columns_ = ManifestReader::WithStatsColumns(columns_);
+ }
+
+ std::unordered_map<int32_t, std::unique_ptr<TaskContext>> task_context_cache;
+ auto get_task_context = [&](int32_t spec_id) -> Result<TaskContext*> {
+ if (task_context_cache.contains(spec_id)) {
+ return task_context_cache[spec_id].get();
+ }
+
+ auto spec_iter = specs_by_id_.find(spec_id);
+ ICEBERG_CHECK(spec_iter != specs_by_id_.cend(),
+ "Cannot find partition spec for ID {}", spec_id);
+
+ const auto& spec = spec_iter->second;
+ ICEBERG_ASSIGN_OR_RAISE(auto residuals, get_residual_evaluator(spec_id));
+ task_context_cache[spec_id] = std::make_unique<TaskContext>(
+ TaskContext{.spec = spec,
+ .deletes = delete_index.get(),
+ .residuals = residuals,
+ .drop_stats = drop_stats,
+ .columns_to_keep_stats = columns_to_keep_stats_});
+
+ return task_context_cache[spec_id].get();
+ };
+
+ ICEBERG_ASSIGN_OR_RAISE(auto entry_groups, ReadEntries());
+
+ std::vector<std::shared_ptr<ScanTask>> all_tasks;
+ for (auto& [spec_id, entries] : entry_groups) {
+ ICEBERG_ASSIGN_OR_RAISE(auto ctx, get_task_context(spec_id));
+ ICEBERG_ASSIGN_OR_RAISE(auto tasks, create_tasks(std::move(entries),
*ctx));
+ all_tasks.insert(all_tasks.end(), std::make_move_iterator(tasks.begin()),
+ std::make_move_iterator(tasks.end()));
+ }
+
+ return all_tasks;
+}
+
+Result<std::vector<ManifestEntry>> ManifestGroup::Entries() {
+ ICEBERG_ASSIGN_OR_RAISE(auto entry_groups, ReadEntries());
+
+ std::vector<ManifestEntry> all_entries;
+ for (auto& [_, entries] : entry_groups) {
+ all_entries.insert(all_entries.end(),
std::make_move_iterator(entries.begin()),
+ std::make_move_iterator(entries.end()));
+ }
+
+ return all_entries;
+}
+
+Result<std::unique_ptr<ManifestReader>> ManifestGroup::MakeReader(
+ const ManifestFile& manifest) {
+ auto spec_it = specs_by_id_.find(manifest.partition_spec_id);
+ if (spec_it == specs_by_id_.end()) {
+ return InvalidArgument("Partition spec {} not found for manifest {}",
+ manifest.partition_spec_id, manifest.manifest_path);
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(auto reader,
+ ManifestReader::Make(manifest, io_, schema_,
spec_it->second));
+
+ reader->FilterRows(data_filter_)
+ .FilterPartitions(partition_filter_)
+ .CaseSensitive(case_sensitive_)
+ .Select(columns_);
+
+ return reader;
+}
+
+Result<std::unordered_map<int32_t, std::vector<ManifestEntry>>>
+ManifestGroup::ReadEntries() {
+ std::unordered_map<int32_t, std::unique_ptr<ManifestEvaluator>> eval_cache;
+ auto get_manifest_evaluator = [&](int32_t spec_id) ->
Result<ManifestEvaluator*> {
+ if (eval_cache.contains(spec_id)) {
+ return eval_cache[spec_id].get();
+ }
+
+ auto spec_iter = specs_by_id_.find(spec_id);
+ ICEBERG_CHECK(spec_iter != specs_by_id_.cend(),
+ "Cannot find partition spec for ID {}", spec_id);
+
+ const auto& spec = spec_iter->second;
+ auto projector = Projections::Inclusive(*spec, *schema_, case_sensitive_);
+ ICEBERG_ASSIGN_OR_RAISE(auto partition_filter,
projector->Project(data_filter_));
+ ICEBERG_ASSIGN_OR_RAISE(partition_filter,
+ And::Make(partition_filter, partition_filter_));
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto manifest_evaluator,
+ ManifestEvaluator::MakePartitionFilter(std::move(partition_filter),
spec,
+ *schema_, case_sensitive_));
+ eval_cache[spec_id] = std::move(manifest_evaluator);
+
+ return eval_cache[spec_id].get();
+ };
+
+ std::unique_ptr<Evaluator> data_file_evaluator;
+ if (file_filter_ && file_filter_->op() != Expression::Operation::kTrue) {
+ // TODO(gangwu): create an Evaluator on the DataFile schema with empty
+ // partition type
+ }
+
+ std::unordered_map<int32_t, std::vector<ManifestEntry>> result;
+
+ // TODO(gangwu): Parallelize reading manifests
+ for (const auto& manifest : data_manifests_) {
+ const int32_t spec_id = manifest.partition_spec_id;
+
+ ICEBERG_ASSIGN_OR_RAISE(auto manifest_evaluator,
get_manifest_evaluator(spec_id));
+ ICEBERG_ASSIGN_OR_RAISE(bool should_match,
manifest_evaluator->Evaluate(manifest));
+ if (!should_match) {
+ // Skip this manifest because it doesn't match partition filter
+ continue;
+ }
+
+ if (ignore_deleted_) {
+ // only scan manifests that have entries other than deletes
+ if (!manifest.has_added_files() && !manifest.has_existing_files()) {
+ continue;
+ }
+ }
+
+ if (ignore_existing_) {
+ // only scan manifests that have entries other than existing
+ if (!manifest.has_added_files() && !manifest.has_deleted_files()) {
+ continue;
+ }
+ }
+
+ // Read manifest entries
+ ICEBERG_ASSIGN_OR_RAISE(auto reader, MakeReader(manifest));
+ ICEBERG_ASSIGN_OR_RAISE(auto entries,
+ ignore_deleted_ ? reader->LiveEntries() :
reader->Entries());
+
+ for (auto& entry : entries) {
+ if (ignore_existing_ && entry.status == ManifestStatus::kExisting) {
+ continue;
+ }
+
+ if (data_file_evaluator != nullptr) {
+ // TODO(gangwu): implement data_file_evaluator to evaluate StructLike
on
+ // top of entry.data_file
+ }
+
+ if (!manifest_entry_predicate_(entry)) {
+ continue;
+ }
+
+ result[spec_id].push_back(std::move(entry));
+ }
+ }
+
+ return result;
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/manifest/manifest_group.h
b/src/iceberg/manifest/manifest_group.h
new file mode 100644
index 00000000..10b55278
--- /dev/null
+++ b/src/iceberg/manifest/manifest_group.h
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/manifest/manifest_group.h
+/// Coordinates reading manifest files and producing scan tasks.
+
+#include <functional>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "iceberg/delete_file_index.h"
+#include "iceberg/iceberg_export.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+#include "iceberg/util/error_collector.h"
+
+namespace iceberg {
+
+/// \brief Context passed to task creation functions.
+struct ICEBERG_EXPORT TaskContext {
+ public:
+ std::shared_ptr<PartitionSpec> spec;
+ DeleteFileIndex* deletes;
+ ResidualEvaluator* residuals;
+ bool drop_stats;
+ std::unordered_set<int32_t> columns_to_keep_stats;
+};
+
+/// \brief Coordinates reading manifest files and producing scan tasks.
+class ICEBERG_EXPORT ManifestGroup : public ErrorCollector {
+ public:
+ /// \brief Construct a ManifestGroup with a list of manifests.
+ ///
+ /// \param io FileIO for reading manifest files.
+ /// \param schema Current table schema.
+ /// \param specs_by_id Mapping of partition spec ID to PartitionSpec.
+ /// \param manifests List of manifest files to process.
+ static Result<std::unique_ptr<ManifestGroup>> Make(
+ std::shared_ptr<FileIO> io, std::shared_ptr<Schema> schema,
+ std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id_,
+ std::vector<ManifestFile> manifests);
+
+ /// \brief Construct a ManifestGroup with pre-separated manifests.
+ ///
+ /// \param io FileIO for reading manifest files.
+ /// \param schema Current table schema.
+ /// \param specs_by_id Mapping of partition spec ID to PartitionSpec.
+ /// \param data_manifests List of data manifest files.
+ /// \param delete_manifests List of delete manifest files.
+ static Result<std::unique_ptr<ManifestGroup>> Make(
+ std::shared_ptr<FileIO> io, std::shared_ptr<Schema> schema,
+ std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id,
+ std::vector<ManifestFile> data_manifests,
+ std::vector<ManifestFile> delete_manifests);
+
+ ~ManifestGroup() override;
+
+ ManifestGroup(ManifestGroup&&) noexcept;
+ ManifestGroup& operator=(ManifestGroup&&) noexcept;
+ ManifestGroup(const ManifestGroup&) = delete;
+ ManifestGroup& operator=(const ManifestGroup&) = delete;
+
+ /// \brief Set a row-level data filter.
+ ManifestGroup& FilterData(std::shared_ptr<Expression> filter);
+
+ /// \brief Set a filter that is evaluated against each DataFile's metadata.
+ ManifestGroup& FilterFiles(std::shared_ptr<Expression> filter);
+
+ /// \brief Set a partition filter expression.
+ ManifestGroup& FilterPartitions(std::shared_ptr<Expression> filter);
+
+ /// \brief Set a custom manifest entry filter predicate.
+ ///
+ /// \param predicate A function that returns true if the entry should be
included.
+ ManifestGroup& FilterManifestEntries(
+ std::function<bool(const ManifestEntry&)> predicate);
+
+ /// \brief Ignore deleted entries in manifests.
+ ManifestGroup& IgnoreDeleted();
+
+ /// \brief Ignore existing entries in manifests.
+ ManifestGroup& IgnoreExisting();
+
+ /// \brief Ignore residual filter computation.
+ ManifestGroup& IgnoreResiduals();
+
+ /// \brief Select specific columns from manifest entries.
+ ///
+ /// \param columns Column names to select from manifest entries.
+ ManifestGroup& Select(std::vector<std::string> columns);
+
+ /// \brief Set case sensitivity for column name matching.
+ ManifestGroup& CaseSensitive(bool case_sensitive);
+
+ /// \brief Specify columns that should retain their statistics.
+ ///
+ /// \param column_ids Field IDs of columns whose statistics should be
preserved.
+ ManifestGroup& ColumnsToKeepStats(std::unordered_set<int32_t> column_ids);
+
+ /// \brief Plan scan tasks for all matching data files.
+ Result<std::vector<std::shared_ptr<FileScanTask>>> PlanFiles();
+
+ /// \brief Get all matching manifest entries.
+ Result<std::vector<ManifestEntry>> Entries();
+
+ using CreateTasksFunction =
+ std::function<Result<std::vector<std::shared_ptr<ScanTask>>>(
+ std::vector<ManifestEntry>&&, const TaskContext&)>;
+
+ /// \brief Plan tasks using a custom task creation function.
+ ///
+ /// \param create_tasks A function that creates ScanTasks from entries and
context.
+ /// \return A list of ScanTask objects, or error on failure.
+ Result<std::vector<std::shared_ptr<ScanTask>>> Plan(
+ const CreateTasksFunction& create_tasks);
+
+ private:
+ ManifestGroup(std::shared_ptr<FileIO> io, std::shared_ptr<Schema> schema,
+ std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>
specs_by_id,
+ std::vector<ManifestFile> data_manifests,
+ DeleteFileIndex::Builder&& delete_index_builder);
+
+ Result<std::unordered_map<int32_t, std::vector<ManifestEntry>>>
ReadEntries();
+
+ Result<std::unique_ptr<ManifestReader>> MakeReader(const ManifestFile&
manifest);
+
+ std::shared_ptr<FileIO> io_;
+ std::shared_ptr<Schema> schema_;
+ std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id_;
+ std::vector<ManifestFile> data_manifests_;
+ DeleteFileIndex::Builder delete_index_builder_;
+ std::shared_ptr<Expression> data_filter_;
+ std::shared_ptr<Expression> file_filter_;
+ std::shared_ptr<Expression> partition_filter_;
+ std::function<bool(const ManifestEntry&)> manifest_entry_predicate_;
+ std::vector<std::string> columns_;
+ std::unordered_set<int32_t> columns_to_keep_stats_;
+ bool case_sensitive_ = true;
+ bool ignore_deleted_ = false;
+ bool ignore_existing_ = false;
+ bool ignore_residuals_ = false;
+};
+
+} // namespace iceberg
diff --git a/src/iceberg/manifest/manifest_writer.cc
b/src/iceberg/manifest/manifest_writer.cc
index c30f9f75..649797fe 100644
--- a/src/iceberg/manifest/manifest_writer.cc
+++ b/src/iceberg/manifest/manifest_writer.cc
@@ -21,6 +21,7 @@
#include <optional>
+#include "iceberg/constants.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/manifest/manifest_list.h"
#include "iceberg/manifest/v1_metadata_internal.h"
@@ -233,11 +234,12 @@ Result<ManifestFile> ManifestWriter::ToManifestFile()
const {
.manifest_length = manifest_length,
.partition_spec_id = adapter_->partition_spec()->spec_id(),
.content = adapter_->content(),
- // sequence_number and min_sequence_number with kInvalidSequenceNumber
will be
- // replace with real sequence number in `ManifestListWriter`.
- .sequence_number = TableMetadata::kInvalidSequenceNumber,
- .min_sequence_number =
- min_sequence_number_.value_or(TableMetadata::kInvalidSequenceNumber),
+ // sequence_number with kUnassignedSequenceNumber will be assigned when
committed.
+ .sequence_number = kUnassignedSequenceNumber,
+ // if the min_sequence_number_ is missing, then no manifests with a
sequence number
+ // have been written, so the min data sequence number is the one that
will be
+ // assigned when this is committed. pass kUnassignedSequenceNumber to
inherit it.
+ .min_sequence_number =
min_sequence_number_.value_or(kUnassignedSequenceNumber),
.added_snapshot_id =
adapter_->snapshot_id().value_or(kInvalidSnapshotId),
.added_files_count = add_files_count_,
.existing_files_count = existing_files_count_,
diff --git a/src/iceberg/manifest/meson.build b/src/iceberg/manifest/meson.build
index 00f93c59..41e685ff 100644
--- a/src/iceberg/manifest/meson.build
+++ b/src/iceberg/manifest/meson.build
@@ -18,6 +18,7 @@
install_headers(
[
'manifest_entry.h',
+ 'manifest_group.h',
'manifest_list.h',
'manifest_reader.h',
'manifest_writer.h',
diff --git a/src/iceberg/manifest/v2_metadata.cc
b/src/iceberg/manifest/v2_metadata.cc
index 737fa62f..e5c54fca 100644
--- a/src/iceberg/manifest/v2_metadata.cc
+++ b/src/iceberg/manifest/v2_metadata.cc
@@ -17,6 +17,7 @@
* under the License.
*/
+#include "iceberg/constants.h"
#include "iceberg/json_internal.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/manifest/manifest_list.h"
@@ -157,7 +158,7 @@ Status ManifestFileAdapterV2::Append(const ManifestFile&
file) {
}
Result<int64_t> ManifestFileAdapterV2::GetSequenceNumber(const ManifestFile&
file) const {
- if (file.sequence_number == TableMetadata::kInvalidSequenceNumber) {
+ if (file.sequence_number == kUnassignedSequenceNumber) {
// if the sequence number is being assigned here, then the manifest must
be created by
// the current operation. to validate this, check that the snapshot id
matches the
// current commit
@@ -173,7 +174,7 @@ Result<int64_t>
ManifestFileAdapterV2::GetSequenceNumber(const ManifestFile& fil
Result<int64_t> ManifestFileAdapterV2::GetMinSequenceNumber(
const ManifestFile& file) const {
- if (file.min_sequence_number == TableMetadata::kInvalidSequenceNumber) {
+ if (file.min_sequence_number == kUnassignedSequenceNumber) {
// same sanity check as above
if (snapshot_id_ != file.added_snapshot_id) {
return InvalidManifestList(
diff --git a/src/iceberg/manifest/v3_metadata.cc
b/src/iceberg/manifest/v3_metadata.cc
index 87d4d77d..35a1a89d 100644
--- a/src/iceberg/manifest/v3_metadata.cc
+++ b/src/iceberg/manifest/v3_metadata.cc
@@ -20,6 +20,7 @@
#include <memory>
#include <optional>
+#include "iceberg/constants.h"
#include "iceberg/json_internal.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/manifest/manifest_list.h"
@@ -205,7 +206,7 @@ Status ManifestFileAdapterV3::Append(const ManifestFile&
file) {
}
Result<int64_t> ManifestFileAdapterV3::GetSequenceNumber(const ManifestFile&
file) const {
- if (file.sequence_number == TableMetadata::kInvalidSequenceNumber) {
+ if (file.sequence_number == kUnassignedSequenceNumber) {
// if the sequence number is being assigned here, then the manifest must
be created by
// the current operation. to validate this, check that the snapshot id
matches the
// current commit
@@ -221,7 +222,7 @@ Result<int64_t>
ManifestFileAdapterV3::GetSequenceNumber(const ManifestFile& fil
Result<int64_t> ManifestFileAdapterV3::GetMinSequenceNumber(
const ManifestFile& file) const {
- if (file.min_sequence_number == TableMetadata::kInvalidSequenceNumber) {
+ if (file.min_sequence_number == kUnassignedSequenceNumber) {
// same sanity check as above
if (snapshot_id_ != file.added_snapshot_id) {
return InvalidManifestList(
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index 2b344dc1..55349d8d 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -63,6 +63,7 @@ iceberg_sources = files(
'json_internal.cc',
'manifest/manifest_adapter.cc',
'manifest/manifest_entry.cc',
+ 'manifest/manifest_group.cc',
'manifest/manifest_list.cc',
'manifest/manifest_reader.cc',
'manifest/manifest_writer.cc',
diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc
index 6918de82..700cab1f 100644
--- a/src/iceberg/table_scan.cc
+++ b/src/iceberg/table_scan.cc
@@ -31,7 +31,6 @@
#include "iceberg/schema_field.h"
#include "iceberg/snapshot.h"
#include "iceberg/table_metadata.h"
-#include "iceberg/type.h"
#include "iceberg/util/macros.h"
namespace iceberg {
@@ -136,12 +135,29 @@ Result<ArrowArrayStream>
MakeArrowArrayStream(std::unique_ptr<Reader> reader) {
} // namespace
-// implement FileScanTask
-FileScanTask::FileScanTask(std::shared_ptr<DataFile> data_file)
- : data_file_(std::move(data_file)) {}
+// FileScanTask implementation
+
+FileScanTask::FileScanTask(std::shared_ptr<DataFile> data_file,
+ std::vector<std::shared_ptr<DataFile>> delete_files,
+ std::shared_ptr<Expression> residual_filter)
+ : data_file_(std::move(data_file)),
+ delete_files_(std::move(delete_files)),
+ residual_filter_(std::move(residual_filter)) {}
const std::shared_ptr<DataFile>& FileScanTask::data_file() const { return
data_file_; }
+const std::vector<std::shared_ptr<DataFile>>& FileScanTask::delete_files()
const {
+ return delete_files_;
+}
+
+const std::shared_ptr<Expression>& FileScanTask::residual_filter() const {
+ return residual_filter_;
+}
+
+bool FileScanTask::has_deletes() const { return !delete_files_.empty(); }
+
+bool FileScanTask::has_residual_filter() const { return residual_filter_ !=
nullptr; }
+
int64_t FileScanTask::size_bytes() const { return
data_file_->file_size_in_bytes; }
int32_t FileScanTask::files_count() const { return 1; }
@@ -151,6 +167,10 @@ int64_t FileScanTask::estimated_row_count() const { return
data_file_->record_co
Result<ArrowArrayStream> FileScanTask::ToArrow(
const std::shared_ptr<FileIO>& io, const std::shared_ptr<Schema>&
projected_schema,
const std::shared_ptr<Expression>& filter) const {
+ if (has_deletes()) {
+ return NotSupported("Reading data files with delete files is not yet
supported.");
+ }
+
const ReaderOptions options{.path = data_file_->file_path,
.length = data_file_->file_size_in_bytes,
.io = io,
diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h
index c3b1b28f..4f2ddfde 100644
--- a/src/iceberg/table_scan.h
+++ b/src/iceberg/table_scan.h
@@ -46,11 +46,30 @@ class ICEBERG_EXPORT ScanTask {
/// \brief Task representing a data file and its corresponding delete files.
class ICEBERG_EXPORT FileScanTask : public ScanTask {
public:
- explicit FileScanTask(std::shared_ptr<DataFile> data_file);
+ /// \brief Construct with data file, delete files, and residual filter.
+ ///
+ /// \param data_file The data file to read.
+ /// \param delete_files Delete files that apply to this data file.
+ /// \param residual_filter Optional residual filter to apply after reading.
+ explicit FileScanTask(std::shared_ptr<DataFile> data_file,
+ std::vector<std::shared_ptr<DataFile>> delete_files =
{},
+ std::shared_ptr<Expression> residual_filter = nullptr);
/// \brief The data file that should be read by this scan task.
const std::shared_ptr<DataFile>& data_file() const;
+ /// \brief Delete files that apply to this data file.
+ const std::vector<std::shared_ptr<DataFile>>& delete_files() const;
+
+ /// \brief Residual filter to apply after reading.
+ const std::shared_ptr<Expression>& residual_filter() const;
+
+ /// \brief Check if any deletes need to be applied.
+ bool has_deletes() const;
+
+ /// \brief Check if a residual filter needs to be applied.
+ bool has_residual_filter() const;
+
int64_t size_bytes() const override;
int32_t files_count() const override;
int64_t estimated_row_count() const override;
@@ -70,6 +89,10 @@ class ICEBERG_EXPORT FileScanTask : public ScanTask {
private:
/// \brief Data file metadata.
std::shared_ptr<DataFile> data_file_;
+ /// \brief Delete files that apply to this data file.
+ std::vector<std::shared_ptr<DataFile>> delete_files_;
+ /// \brief Residual filter to apply after reading.
+ std::shared_ptr<Expression> residual_filter_;
};
/// \brief Scan context holding snapshot and scan-specific metadata.
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index 731fe0af..a32bbe4d 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -143,6 +143,7 @@ if(ICEBERG_BUILD_BUNDLE)
USE_BUNDLE
SOURCES
delete_file_index_test.cc
+ manifest_group_test.cc
manifest_list_versions_test.cc
manifest_reader_stats_test.cc
manifest_reader_test.cc
diff --git a/src/iceberg/test/delete_file_index_test.cc
b/src/iceberg/test/delete_file_index_test.cc
index e091f5ea..8c87772e 100644
--- a/src/iceberg/test/delete_file_index_test.cc
+++ b/src/iceberg/test/delete_file_index_test.cc
@@ -198,9 +198,9 @@ class DeleteFileIndexTest : public
testing::TestWithParam<int> {
Result<std::unique_ptr<DeleteFileIndex>> BuildIndex(
std::vector<ManifestFile> delete_manifests,
std::optional<int64_t> after_sequence_number = std::nullopt) {
- ICEBERG_ASSIGN_OR_RAISE(
- auto builder, DeleteFileIndex::BuilderFor(file_io_,
std::move(delete_manifests)));
- builder.SpecsById(GetSpecsById()).WithSchema(schema_);
+ ICEBERG_ASSIGN_OR_RAISE(auto builder,
+ DeleteFileIndex::BuilderFor(file_io_, schema_,
GetSpecsById(),
+
std::move(delete_manifests)));
if (after_sequence_number.has_value()) {
builder.AfterSequenceNumber(after_sequence_number.value());
}
diff --git a/src/iceberg/test/manifest_group_test.cc
b/src/iceberg/test/manifest_group_test.cc
new file mode 100644
index 00000000..32466c22
--- /dev/null
+++ b/src/iceberg/test/manifest_group_test.cc
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest/manifest_group.h"
+
+#include <chrono>
+#include <format>
+#include <memory>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/avro/avro_register.h"
+#include "iceberg/expression/expressions.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/schema.h"
+#include "iceberg/table_scan.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/transform.h"
+#include "iceberg/type.h"
+
+namespace iceberg {
+
+class ManifestGroupTest : public testing::TestWithParam<int> {
+ protected:
+ void SetUp() override {
+ avro::RegisterAll();
+
+ file_io_ = arrow::MakeMockFileIO();
+
+ // Schema with id and data fields
+ schema_ = std::make_shared<Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(/*field_id=*/1, "id", int32()),
+ SchemaField::MakeRequired(/*field_id=*/2, "data", string())});
+
+ // Partitioned spec: bucket by data
+ ICEBERG_UNWRAP_OR_FAIL(
+ partitioned_spec_,
+ PartitionSpec::Make(
+ /*spec_id=*/1, {PartitionField(/*source_id=*/2, /*field_id=*/1000,
+ "data_bucket_16_2",
Transform::Bucket(16))}));
+
+ // Unpartitioned spec
+ unpartitioned_spec_ = PartitionSpec::Unpartitioned();
+ }
+
+ std::string MakeManifestPath() {
+ static int counter = 0;
+ return std::format("manifest-{}-{}.avro", counter++,
+
std::chrono::system_clock::now().time_since_epoch().count());
+ }
+
+ std::shared_ptr<DataFile> MakeDataFile(const std::string& path,
+ const PartitionValues& partition,
+ int32_t spec_id, int64_t record_count
= 1) {
+ return std::make_shared<DataFile>(DataFile{
+ .file_path = path,
+ .file_format = FileFormatType::kParquet,
+ .partition = partition,
+ .record_count = record_count,
+ .file_size_in_bytes = 10,
+ .sort_order_id = 0,
+ .partition_spec_id = spec_id,
+ });
+ }
+
+ std::shared_ptr<DataFile> MakePositionDeleteFile(
+ const std::string& path, const PartitionValues& partition, int32_t
spec_id,
+ std::optional<std::string> referenced_file = std::nullopt) {
+ return std::make_shared<DataFile>(DataFile{
+ .content = DataFile::Content::kPositionDeletes,
+ .file_path = path,
+ .file_format = FileFormatType::kParquet,
+ .partition = partition,
+ .record_count = 1,
+ .file_size_in_bytes = 10,
+ .partition_spec_id = spec_id,
+ .referenced_data_file = referenced_file,
+ });
+ }
+
+ std::shared_ptr<DataFile> MakeEqualityDeleteFile(const std::string& path,
+ const PartitionValues&
partition,
+ int32_t spec_id,
+ std::vector<int>
equality_ids = {1}) {
+ return std::make_shared<DataFile>(DataFile{
+ .content = DataFile::Content::kEqualityDeletes,
+ .file_path = path,
+ .file_format = FileFormatType::kParquet,
+ .partition = partition,
+ .record_count = 1,
+ .file_size_in_bytes = 10,
+ .equality_ids = std::move(equality_ids),
+ .partition_spec_id = spec_id,
+ });
+ }
+
+ ManifestEntry MakeEntry(ManifestStatus status, int64_t snapshot_id,
+ int64_t sequence_number, std::shared_ptr<DataFile>
file) {
+ return ManifestEntry{
+ .status = status,
+ .snapshot_id = snapshot_id,
+ .sequence_number = sequence_number,
+ .file_sequence_number = sequence_number,
+ .data_file = std::move(file),
+ };
+ }
+
+ ManifestFile WriteDataManifest(int format_version, int64_t snapshot_id,
+ std::vector<ManifestEntry> entries,
+ std::shared_ptr<PartitionSpec> spec) {
+ const std::string manifest_path = MakeManifestPath();
+
+ Result<std::unique_ptr<ManifestWriter>> writer_result =
+ NotSupported("Format version: {}", format_version);
+
+ if (format_version == 1) {
+ writer_result = ManifestWriter::MakeV1Writer(snapshot_id, manifest_path,
file_io_,
+ spec, schema_);
+ } else if (format_version == 2) {
+ writer_result = ManifestWriter::MakeV2Writer(snapshot_id, manifest_path,
file_io_,
+ spec, schema_,
ManifestContent::kData);
+ } else if (format_version == 3) {
+ writer_result =
+ ManifestWriter::MakeV3Writer(snapshot_id, /*first_row_id=*/0L,
manifest_path,
+ file_io_, spec, schema_,
ManifestContent::kData);
+ }
+
+ EXPECT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ for (const auto& entry : entries) {
+ EXPECT_THAT(writer->WriteEntry(entry), IsOk());
+ }
+
+ EXPECT_THAT(writer->Close(), IsOk());
+ auto manifest_result = writer->ToManifestFile();
+ EXPECT_THAT(manifest_result, IsOk());
+ return std::move(manifest_result.value());
+ }
+
+ ManifestFile WriteDeleteManifest(int format_version, int64_t snapshot_id,
+ std::vector<ManifestEntry> entries,
+ std::shared_ptr<PartitionSpec> spec) {
+ const std::string manifest_path = MakeManifestPath();
+
+ Result<std::unique_ptr<ManifestWriter>> writer_result =
+ NotSupported("Format version: {}", format_version);
+
+ if (format_version == 2) {
+ writer_result = ManifestWriter::MakeV2Writer(
+ snapshot_id, manifest_path, file_io_, spec, schema_,
ManifestContent::kDeletes);
+ } else if (format_version == 3) {
+ writer_result = ManifestWriter::MakeV3Writer(
+ snapshot_id, /*first_row_id=*/std::nullopt, manifest_path, file_io_,
spec,
+ schema_, ManifestContent::kDeletes);
+ }
+
+ EXPECT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ for (const auto& entry : entries) {
+ EXPECT_THAT(writer->WriteEntry(entry), IsOk());
+ }
+
+ EXPECT_THAT(writer->Close(), IsOk());
+ auto manifest_result = writer->ToManifestFile();
+ EXPECT_THAT(manifest_result, IsOk());
+ return std::move(manifest_result.value());
+ }
+
+ std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> GetSpecsById() {
+ return {{partitioned_spec_->spec_id(), partitioned_spec_},
+ {unpartitioned_spec_->spec_id(), unpartitioned_spec_}};
+ }
+
+ std::string MakeManifestListPath() {
+ static int counter = 0;
+ return std::format("manifest-list-{}-{}.avro", counter++,
+
std::chrono::system_clock::now().time_since_epoch().count());
+ }
+
+ // Write a ManifestFile to a manifest list and read it back. This is useful
for v1
+ // to populate all missing fields like sequence_number.
+ ManifestFile WriteAndReadManifestListEntry(int format_version, int64_t
snapshot_id,
+ int64_t sequence_number,
+ const ManifestFile& manifest) {
+ const std::string manifest_list_path = MakeManifestListPath();
+ constexpr int64_t kParentSnapshotId = 0L;
+ constexpr int64_t kSnapshotFirstRowId = 0L;
+
+ Result<std::unique_ptr<ManifestListWriter>> writer_result =
+ NotSupported("Format version: {}", format_version);
+
+ if (format_version == 1) {
+ writer_result = ManifestListWriter::MakeV1Writer(snapshot_id,
kParentSnapshotId,
+ manifest_list_path,
file_io_);
+ } else if (format_version == 2) {
+ writer_result = ManifestListWriter::MakeV2Writer(
+ snapshot_id, kParentSnapshotId, sequence_number, manifest_list_path,
file_io_);
+ } else if (format_version == 3) {
+ writer_result = ManifestListWriter::MakeV3Writer(
+ snapshot_id, kParentSnapshotId, sequence_number, kSnapshotFirstRowId,
+ manifest_list_path, file_io_);
+ }
+
+ EXPECT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+ EXPECT_THAT(writer->Add(manifest), IsOk());
+ EXPECT_THAT(writer->Close(), IsOk());
+
+ auto reader_result = ManifestListReader::Make(manifest_list_path,
file_io_);
+ EXPECT_THAT(reader_result, IsOk());
+ auto reader = std::move(reader_result.value());
+ auto files_result = reader->Files();
+ EXPECT_THAT(files_result, IsOk());
+
+ auto manifests = files_result.value();
+ EXPECT_EQ(manifests.size(), 1);
+ return manifests[0];
+ }
+
+ static std::vector<std::string> GetPaths(
+ const std::vector<std::shared_ptr<FileScanTask>>& tasks) {
+ return tasks | std::views::transform([](const auto& task) {
+ return task->data_file()->file_path;
+ }) |
+ std::ranges::to<std::vector<std::string>>();
+ }
+
+ static std::vector<std::string> GetEntryPaths(
+ const std::vector<ManifestEntry>& entries) {
+ return entries | std::views::transform([](const auto& entry) {
+ return entry.data_file->file_path;
+ }) |
+ std::ranges::to<std::vector<std::string>>();
+ }
+
+ std::shared_ptr<FileIO> file_io_;
+ std::shared_ptr<Schema> schema_;
+ std::shared_ptr<PartitionSpec> partitioned_spec_;
+ std::shared_ptr<PartitionSpec> unpartitioned_spec_;
+};
+
+TEST_P(ManifestGroupTest, CreateAndGetEntries) {
+ int version = GetParam();
+ if (version < 2) {
+ GTEST_SKIP() << "Delete files only supported in V2+";
+ }
+
+ constexpr int64_t kSnapshotId = 1000L;
+ const auto part_value = PartitionValues({Literal::Int(0)});
+
+ // Create data manifests
+ std::vector<ManifestEntry> data_entries{
+ MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+ MakeDataFile("/path/to/data1.parquet", part_value,
+ partitioned_spec_->spec_id())),
+ MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+ MakeDataFile("/path/to/data2.parquet", part_value,
+ partitioned_spec_->spec_id()))};
+ auto data_manifest =
+ WriteDataManifest(version, kSnapshotId, std::move(data_entries),
partitioned_spec_);
+
+ // Create delete manifest
+ std::vector<ManifestEntry> delete_entries{
+ MakeEntry(ManifestStatus::kAdded, kSnapshotId,
+ /*sequence_number=*/2,
+ MakePositionDeleteFile("/path/to/delete.parquet", part_value,
+ partitioned_spec_->spec_id()))};
+ auto delete_manifest = WriteDeleteManifest(
+ version, kSnapshotId, std::move(delete_entries), partitioned_spec_);
+
+ // Create ManifestGroup with pre-separated manifests
+ std::vector<ManifestFile> data_manifests = {data_manifest};
+ std::vector<ManifestFile> delete_manifests = {delete_manifest};
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto group,
+ ManifestGroup::Make(file_io_, schema_, GetSpecsById(),
std::move(data_manifests),
+ std::move(delete_manifests)));
+
+ // Verify Entries() returns only data file entries
+ ICEBERG_UNWRAP_OR_FAIL(auto entries, group->Entries());
+ ASSERT_EQ(entries.size(), 2);
+ EXPECT_THAT(
+ GetEntryPaths(entries),
+ testing::UnorderedElementsAre("/path/to/data1.parquet",
"/path/to/data2.parquet"));
+
+ // Verify PlanFiles returns data files with associated delete files
+ ICEBERG_UNWRAP_OR_FAIL(auto tasks, group->PlanFiles());
+ ASSERT_EQ(tasks.size(), 2);
+ EXPECT_THAT(GetPaths(tasks),
testing::UnorderedElementsAre("/path/to/data1.parquet",
+
"/path/to/data2.parquet"));
+ EXPECT_EQ(tasks[0]->delete_files().size(), 1);
+ EXPECT_EQ(tasks[1]->delete_files()[0]->file_path, "/path/to/delete.parquet");
+ EXPECT_EQ(tasks[1]->delete_files().size(), 1);
+ EXPECT_EQ(tasks[1]->delete_files()[0]->file_path, "/path/to/delete.parquet");
+}
+
+TEST_P(ManifestGroupTest, IgnoreDeleted) {
+ int version = GetParam();
+
+ constexpr int64_t kSnapshotId = 1000L;
+ constexpr int64_t kSequenceNumber = 1L;
+ const auto part_value = PartitionValues({Literal::Int(0)});
+
+ // Create data manifest with ADDED and DELETED entries
+ std::vector<ManifestEntry> data_entries{
+ MakeEntry(ManifestStatus::kAdded, kSnapshotId, kSequenceNumber,
+ MakeDataFile("/path/to/added.parquet", part_value,
+ partitioned_spec_->spec_id())),
+ MakeEntry(ManifestStatus::kDeleted, kSnapshotId, kSequenceNumber,
+ MakeDataFile("/path/to/deleted.parquet", part_value,
+ partitioned_spec_->spec_id())),
+ MakeEntry(ManifestStatus::kExisting, kSnapshotId, kSequenceNumber,
+ MakeDataFile("/path/to/existing.parquet", part_value,
+ partitioned_spec_->spec_id()))};
+ auto data_manifest =
+ WriteDataManifest(version, kSnapshotId, std::move(data_entries),
partitioned_spec_);
+ auto read_back_manifest =
+ WriteAndReadManifestListEntry(version, kSnapshotId, kSequenceNumber,
data_manifest);
+
+ // Create ManifestGroup with IgnoreDeleted
+ std::vector<ManifestFile> manifests = {read_back_manifest};
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto group,
+ ManifestGroup::Make(file_io_, schema_, GetSpecsById(),
std::move(manifests)));
+ group->IgnoreDeleted();
+
+ // Plan files - should only return ADDED and EXISTING
+ ICEBERG_UNWRAP_OR_FAIL(auto tasks, group->PlanFiles());
+ ASSERT_EQ(tasks.size(), 2);
+ EXPECT_THAT(GetPaths(tasks),
+ testing::UnorderedElementsAre("/path/to/added.parquet",
+ "/path/to/existing.parquet"));
+}
+
+TEST_P(ManifestGroupTest, IgnoreExisting) {
+ int version = GetParam();
+
+ constexpr int64_t kSnapshotId = 1000L;
+ constexpr int64_t kSequenceNumber = 1L;
+ const auto part_value = PartitionValues({Literal::Int(0)});
+
+ // Create data manifest with ADDED and EXISTING entries
+ std::vector<ManifestEntry> data_entries{
+ MakeEntry(ManifestStatus::kAdded, kSnapshotId, kSequenceNumber,
+ MakeDataFile("/path/to/added.parquet", part_value,
+ partitioned_spec_->spec_id())),
+ MakeEntry(ManifestStatus::kExisting, kSnapshotId, kSequenceNumber,
+ MakeDataFile("/path/to/existing.parquet", part_value,
+ partitioned_spec_->spec_id())),
+ MakeEntry(ManifestStatus::kDeleted, kSnapshotId, kSequenceNumber,
+ MakeDataFile("/path/to/deleted.parquet", part_value,
+ partitioned_spec_->spec_id()))};
+
+ auto data_manifest =
+ WriteDataManifest(version, kSnapshotId, std::move(data_entries),
partitioned_spec_);
+ auto read_back_manifest =
+ WriteAndReadManifestListEntry(version, kSnapshotId, kSequenceNumber,
data_manifest);
+
+ // Create ManifestGroup with IgnoreExisting
+ std::vector<ManifestFile> manifests = {read_back_manifest};
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto group,
+ ManifestGroup::Make(file_io_, schema_, GetSpecsById(),
std::move(manifests)));
+ group->IgnoreExisting();
+
+ // Plan files - should only return ADDED
+ ICEBERG_UNWRAP_OR_FAIL(auto tasks, group->PlanFiles());
+ ASSERT_EQ(tasks.size(), 2);
+ EXPECT_THAT(GetPaths(tasks),
testing::UnorderedElementsAre("/path/to/added.parquet",
+
"/path/to/deleted.parquet"));
+}
+
+TEST_P(ManifestGroupTest, CustomManifestEntriesFilter) {
+ int version = GetParam();
+
+ constexpr int64_t kSnapshotId = 1000L;
+ const auto part_value = PartitionValues({Literal::Int(0)});
+
+ // Create data manifest with multiple files
+ std::vector<ManifestEntry> data_entries{
+ MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+ MakeDataFile("/path/to/data1.parquet", part_value,
+ partitioned_spec_->spec_id())),
+ MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+ MakeDataFile("/path/to/data2.parquet", part_value,
+ partitioned_spec_->spec_id())),
+ MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+ MakeDataFile("/path/to/data3.parquet", part_value,
+ partitioned_spec_->spec_id()))};
+ auto data_manifest =
+ WriteDataManifest(version, kSnapshotId, std::move(data_entries),
partitioned_spec_);
+
+ // Create ManifestGroup with custom entry filter
+ std::vector<ManifestFile> manifests = {data_manifest};
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto group,
+ ManifestGroup::Make(file_io_, schema_, GetSpecsById(),
std::move(manifests)));
+ group->FilterManifestEntries([](const ManifestEntry& entry) {
+ // Only include files with "data1" or "data3" in the path
+ return entry.data_file->file_path.find("data1") != std::string::npos ||
+ entry.data_file->file_path.find("data3") != std::string::npos;
+ });
+
+ // Plan files - should only return filtered entries
+ ICEBERG_UNWRAP_OR_FAIL(auto tasks, group->PlanFiles());
+ ASSERT_EQ(tasks.size(), 2);
+ EXPECT_THAT(GetPaths(tasks),
testing::UnorderedElementsAre("/path/to/data1.parquet",
+
"/path/to/data3.parquet"));
+}
+
+TEST_P(ManifestGroupTest, EmptyManifestGroup) {
+ std::vector<ManifestFile> manifests;
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto group,
+ ManifestGroup::Make(file_io_, schema_, GetSpecsById(),
std::move(manifests)));
+
+ ICEBERG_UNWRAP_OR_FAIL(auto tasks, group->PlanFiles());
+ EXPECT_TRUE(tasks.empty());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto entries, group->Entries());
+ EXPECT_TRUE(entries.empty());
+}
+
+TEST_P(ManifestGroupTest, MultipleDataManifests) {
+ int version = GetParam();
+
+ const auto partition_a = PartitionValues({Literal::Int(0)});
+ const auto partition_b = PartitionValues({Literal::Int(1)});
+
+ // Create first data manifest
+ std::vector<ManifestEntry> data_entries_1{MakeEntry(
+ ManifestStatus::kAdded, /*snapshot_id=*/1000L, /*sequence_number=*/1,
+ MakeDataFile("/path/to/data1.parquet", partition_a,
partitioned_spec_->spec_id()))};
+ auto data_manifest_1 = WriteDataManifest(version, /*snapshot_id=*/1000L,
+ std::move(data_entries_1),
partitioned_spec_);
+
+ // Create second data manifest
+ std::vector<ManifestEntry> data_entries_2{MakeEntry(
+ ManifestStatus::kAdded, /*snapshot_id=*/1001L, /*sequence_number=*/2,
+ MakeDataFile("/path/to/data2.parquet", partition_b,
partitioned_spec_->spec_id()))};
+ auto data_manifest_2 = WriteDataManifest(version, /*snapshot_id=*/1001L,
+ std::move(data_entries_2),
partitioned_spec_);
+
+ // Create ManifestGroup with multiple manifests
+ std::vector<ManifestFile> manifests = {data_manifest_1, data_manifest_2};
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto group,
+ ManifestGroup::Make(file_io_, schema_, GetSpecsById(),
std::move(manifests)));
+
+ // Plan files - should return files from both manifests
+ ICEBERG_UNWRAP_OR_FAIL(auto tasks, group->PlanFiles());
+ ASSERT_EQ(tasks.size(), 2);
+ EXPECT_THAT(GetPaths(tasks),
testing::UnorderedElementsAre("/path/to/data1.parquet",
+
"/path/to/data2.parquet"));
+}
+
+TEST_P(ManifestGroupTest, PartitionFilter) {
+ int version = GetParam();
+
+ // Create two files with different partition values (bucket 0 and bucket 1)
+ const auto partition_bucket_0 = PartitionValues({Literal::Int(0)});
+ const auto partition_bucket_1 = PartitionValues({Literal::Int(1)});
+
+ // Create data manifest with two entries in different partitions
+ std::vector<ManifestEntry> data_entries{
+ MakeEntry(ManifestStatus::kAdded, /*snapshot_id=*/1000L,
/*sequence_number=*/1,
+ MakeDataFile("/path/to/bucket0.parquet", partition_bucket_0,
+ partitioned_spec_->spec_id())),
+ MakeEntry(ManifestStatus::kAdded, /*snapshot_id=*/1000L,
/*sequence_number=*/1,
+ MakeDataFile("/path/to/bucket1.parquet", partition_bucket_1,
+ partitioned_spec_->spec_id()))};
+ auto data_manifest = WriteDataManifest(version, /*snapshot_id=*/1000L,
+ std::move(data_entries),
partitioned_spec_);
+
+ // Create ManifestGroup with partition filter for bucket 0
+ std::vector<ManifestFile> manifests = {data_manifest};
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto group,
+ ManifestGroup::Make(file_io_, schema_, GetSpecsById(),
std::move(manifests)));
+
+ // Filter on partition field name (data_bucket_16_2 = 0)
+ auto partition_filter = Expressions::Equal("data_bucket_16_2",
Literal::Int(0));
+ group->FilterPartitions(std::move(partition_filter));
+
+ // Plan files - should only return the file in bucket 0
+ ICEBERG_UNWRAP_OR_FAIL(auto tasks, group->PlanFiles());
+ ASSERT_EQ(tasks.size(), 1);
+ EXPECT_THAT(GetPaths(tasks),
testing::ElementsAre("/path/to/bucket0.parquet"));
+}
+
+INSTANTIATE_TEST_SUITE_P(ManifestGroupVersions, ManifestGroupTest,
+ testing::Values(1, 2, 3));
+
+} // namespace iceberg
diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h
index 6c7e6b77..65afeb87 100644
--- a/src/iceberg/type_fwd.h
+++ b/src/iceberg/type_fwd.h
@@ -130,6 +130,13 @@ class Literal;
class Term;
class UnboundPredicate;
+/// \brief Evaluator.
+class Evaluator;
+class InclusiveMetricsEvaluator;
+class ManifestEvaluator;
+class ResidualEvaluator;
+class StrictMetricsEvaluator;
+
/// \brief Scan.
class DataTableScan;
class FileScanTask;
@@ -144,6 +151,7 @@ struct ManifestEntry;
struct ManifestFile;
struct ManifestList;
struct PartitionFieldSummary;
+class ManifestGroup;
class ManifestListReader;
class ManifestListWriter;
class ManifestReader;