This is an automated email from the ASF dual-hosted git repository.
wgtmac pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new fa368354 fix: evaluate ManifestGroup file filters (#664)
fa368354 is described below
commit fa3683546f85dd46fdbb05deb550b6c6c758e69f
Author: Minh Vu <[email protected]>
AuthorDate: Sun May 24 08:49:02 2026 +0200
fix: evaluate ManifestGroup file filters (#664)
## Summary
Fixes #663.
This implements `ManifestGroup::FilterFiles()` instead of accepting the
filter as a silent no-op. The change adds a `DataFile` `StructLike`
wrapper, binds a file-level evaluator against file metadata, and applies
it to each manifest entry before returning it.
The evaluator schema is aligned with Java `ManifestGroup`: concrete
partition fields are not exposed under `partition`, so callers should
use `FilterData(...)` for logical data predicates. Supported file
metadata includes `spec_id`, and unsupported concrete partition filters
fail during binding rather than being ignored.
The reader projection now includes file-filter referenced metadata
columns when callers use `Select(...)`. Empty container-backed optional
metadata is exposed as null so projected or missing metadata does not
look present to the evaluator.
## Tests
- `uvx pre-commit run --files src/iceberg/manifest/manifest_group.cc
src/iceberg/manifest/manifest_entry.h
src/iceberg/row/manifest_wrapper.cc src/iceberg/row/manifest_wrapper.h
src/iceberg/test/manifest_group_test.cc`
- `cmake --build build --target manifest_test -j 8`
- `./build/src/iceberg/test/manifest_test
--gtest_filter='ManifestGroupVersions/ManifestGroupTest.FilterFiles*'`
- `./build/src/iceberg/test/manifest_test`
---
src/iceberg/manifest/manifest_entry.h | 4 +
src/iceberg/manifest/manifest_group.cc | 87 +++++++++++++--
src/iceberg/row/manifest_wrapper.cc | 173 +++++++++++++++++++++++++++++
src/iceberg/row/manifest_wrapper.h | 21 ++++
src/iceberg/test/manifest_group_test.cc | 186 +++++++++++++++++++++++++++++++-
5 files changed, 463 insertions(+), 8 deletions(-)
diff --git a/src/iceberg/manifest/manifest_entry.h
b/src/iceberg/manifest/manifest_entry.h
index c1f81d0a..17c5388b 100644
--- a/src/iceberg/manifest/manifest_entry.h
+++ b/src/iceberg/manifest/manifest_entry.h
@@ -193,6 +193,10 @@ struct ICEBERG_EXPORT DataFile {
SchemaField::MakeRequired(kFileFormatFieldId, "file_format", string(),
"File format name: avro, orc, or parquet");
+ static constexpr int32_t kSpecIdFieldId = 141;
+ inline static const SchemaField kSpecId =
+ SchemaField::MakeOptional(kSpecIdFieldId, "spec_id", int32(), "Partition
spec ID");
+
static constexpr int32_t kPartitionFieldId = 102;
inline static const std::string kPartitionField = "partition";
inline static const std::string kPartitionDoc =
diff --git a/src/iceberg/manifest/manifest_group.cc
b/src/iceberg/manifest/manifest_group.cc
index 8af717b2..61bb57da 100644
--- a/src/iceberg/manifest/manifest_group.cc
+++ b/src/iceberg/manifest/manifest_group.cc
@@ -19,8 +19,14 @@
#include "iceberg/manifest/manifest_group.h"
+#include <algorithm>
+#include <memory>
+#include <string>
+#include <unordered_set>
#include <utility>
+#include <vector>
+#include "iceberg/expression/binder.h"
#include "iceberg/expression/evaluator.h"
#include "iceberg/expression/expression.h"
#include "iceberg/expression/manifest_evaluator.h"
@@ -29,14 +35,47 @@
#include "iceberg/file_io.h"
#include "iceberg/manifest/manifest_reader.h"
#include "iceberg/partition_spec.h"
+#include "iceberg/row/manifest_wrapper.h"
#include "iceberg/schema.h"
#include "iceberg/table_scan.h"
+#include "iceberg/type.h"
#include "iceberg/util/checked_cast.h"
#include "iceberg/util/content_file_util.h"
#include "iceberg/util/macros.h"
namespace iceberg {
+namespace {
+
+std::shared_ptr<Schema> DataFileFilterSchema() {
+ auto empty_partition_type =
std::make_shared<StructType>(std::vector<SchemaField>{});
+ return std::make_shared<Schema>(std::vector<SchemaField>{
+ DataFile::kContent,
+ DataFile::kFilePath,
+ DataFile::kFileFormat,
+ DataFile::kSpecId,
+ SchemaField::MakeRequired(DataFile::kPartitionFieldId,
DataFile::kPartitionField,
+ std::move(empty_partition_type),
DataFile::kPartitionDoc),
+ DataFile::kRecordCount,
+ DataFile::kFileSize,
+ DataFile::kColumnSizes,
+ DataFile::kValueCounts,
+ DataFile::kNullValueCounts,
+ DataFile::kNanValueCounts,
+ DataFile::kLowerBounds,
+ DataFile::kUpperBounds,
+ DataFile::kKeyMetadata,
+ DataFile::kSplitOffsets,
+ DataFile::kEqualityIds,
+ DataFile::kSortOrderId,
+ DataFile::kFirstRowId,
+ DataFile::kReferencedDataFile,
+ DataFile::kContentOffset,
+ DataFile::kContentSize});
+}
+
+} // namespace
+
Result<std::unique_ptr<ManifestGroup>> ManifestGroup::Make(
std::shared_ptr<FileIO> io, std::shared_ptr<Schema> schema,
std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id,
@@ -265,10 +304,39 @@ Result<std::unique_ptr<ManifestReader>>
ManifestGroup::MakeReader(
ICEBERG_ASSIGN_OR_RAISE(auto reader,
ManifestReader::Make(manifest, io_, schema_,
specs_by_id_));
+ auto columns = columns_;
+ if (file_filter_ && file_filter_->op() != Expression::Operation::kTrue &&
+ !columns.empty() &&
+ std::ranges::find(columns, Schema::kAllColumns) == columns.end()) {
+ auto data_file_schema = DataFileFilterSchema();
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto bound_file_filter,
+ Binder::Bind(*data_file_schema, file_filter_, case_sensitive_));
+ ICEBERG_ASSIGN_OR_RAISE(auto referenced_field_ids,
+
ReferenceVisitor::GetReferencedFieldIds(bound_file_filter));
+
+ std::unordered_set<std::string> selected_columns(columns.cbegin(),
columns.cend());
+ for (const auto field_id : referenced_field_ids) {
+ if (field_id == DataFile::kSpecIdFieldId) {
+ continue;
+ }
+ ICEBERG_ASSIGN_OR_RAISE(auto column_name,
+ data_file_schema->FindColumnNameById(field_id));
+ if (column_name.has_value()) {
+ std::string column_name_str(column_name.value());
+ if (selected_columns.contains(column_name_str)) {
+ continue;
+ }
+ columns.push_back(std::move(column_name_str));
+ selected_columns.insert(columns.back());
+ }
+ }
+ }
+
reader->FilterRows(data_filter_)
.FilterPartitions(partition_filter_)
.CaseSensitive(case_sensitive_)
- .Select(columns_);
+ .Select(std::move(columns));
return reader;
}
@@ -299,10 +367,13 @@ ManifestGroup::ReadEntries() {
return eval_cache[spec_id].get();
};
+ const bool has_file_filter =
+ file_filter_ && file_filter_->op() != Expression::Operation::kTrue;
std::unique_ptr<Evaluator> data_file_evaluator;
- if (file_filter_ && file_filter_->op() != Expression::Operation::kTrue) {
- // TODO(gangwu): create an Evaluator on the DataFile schema with empty
- // partition type
+ if (has_file_filter) {
+ ICEBERG_ASSIGN_OR_RAISE(
+ data_file_evaluator,
+ Evaluator::Make(*DataFileFilterSchema(), file_filter_,
case_sensitive_));
}
std::unordered_map<int32_t, std::vector<ManifestEntry>> result;
@@ -343,8 +414,12 @@ ManifestGroup::ReadEntries() {
}
if (data_file_evaluator != nullptr) {
- // TODO(gangwu): implement data_file_evaluator to evaluate StructLike
on
- // top of entry.data_file
+ DataFileStructLike data_file(*entry.data_file);
+ ICEBERG_ASSIGN_OR_RAISE(bool should_match,
+ data_file_evaluator->Evaluate(data_file));
+ if (!should_match) {
+ continue;
+ }
}
if (!manifest_entry_predicate_(entry)) {
diff --git a/src/iceberg/row/manifest_wrapper.cc
b/src/iceberg/row/manifest_wrapper.cc
index 851f9e72..18be8223 100644
--- a/src/iceberg/row/manifest_wrapper.cc
+++ b/src/iceberg/row/manifest_wrapper.cc
@@ -19,18 +19,57 @@
#include "iceberg/row/manifest_wrapper.h"
+#include <iterator>
+#include <map>
+#include <memory>
+#include <span>
+#include <type_traits>
+#include <vector>
+
#include "iceberg/manifest/manifest_reader_internal.h"
#include "iceberg/util/macros.h"
namespace iceberg {
namespace {
+
+enum class DataFileFieldPosition : size_t {
+ kContent = 0,
+ kFilePath = 1,
+ kFileFormat = 2,
+ kSpecId = 3,
+ kPartition = 4,
+ kRecordCount = 5,
+ kFileSize = 6,
+ kColumnSizes = 7,
+ kValueCounts = 8,
+ kNullValueCounts = 9,
+ kNanValueCounts = 10,
+ kLowerBounds = 11,
+ kUpperBounds = 12,
+ kKeyMetadata = 13,
+ kSplitOffsets = 14,
+ kEqualityIds = 15,
+ kSortOrderId = 16,
+ kFirstRowId = 17,
+ kReferencedDataFile = 18,
+ kContentOffset = 19,
+ kContentSize = 20,
+ kNextUnusedId = 21,
+};
+
template <typename T>
requires std::is_same_v<T, std::vector<uint8_t>> || std::is_same_v<T,
std::string>
std::string_view ToView(const T& value) {
return {reinterpret_cast<const char*>(value.data()), value.size()}; //
NOLINT
}
+Scalar ToScalar(const int32_t value) { return value; }
+
+Scalar ToScalar(const int64_t value) { return value; }
+
+Scalar ToScalar(const std::vector<uint8_t>& value) { return ToView(value); }
+
template <typename T>
Result<Scalar> FromOptional(const std::optional<T>& value) {
if (value.has_value()) {
@@ -39,6 +78,79 @@ Result<Scalar> FromOptional(const std::optional<T>& value) {
return std::monostate{};
}
+Result<Scalar> FromOptionalString(const std::optional<std::string>& value) {
+ if (value.has_value()) {
+ return ToView(value.value());
+ }
+ return std::monostate{};
+}
+
+template <typename T>
+class VectorArrayLike : public ArrayLike {
+ public:
+ explicit VectorArrayLike(std::span<const T> values) : values_(values) {}
+
+ Result<Scalar> GetElement(size_t pos) const override {
+ if (pos >= size()) {
+ return InvalidArgument("Invalid array index: {}", pos);
+ }
+ return ToScalar(values_[pos]);
+ }
+
+ size_t size() const override { return values_.size(); }
+
+ private:
+ std::span<const T> values_;
+};
+
+template <typename V>
+class IntMapLike : public MapLike {
+ public:
+ explicit IntMapLike(const std::map<int32_t, V>& values) : values_(values) {}
+
+ Result<Scalar> GetKey(size_t pos) const override {
+ if (pos >= size()) {
+ return InvalidArgument("Invalid map index: {}", pos);
+ }
+ return std::next(values_.get().cbegin(), pos)->first;
+ }
+
+ Result<Scalar> GetValue(size_t pos) const override {
+ if (pos >= size()) {
+ return InvalidArgument("Invalid map index: {}", pos);
+ }
+ return ToScalar(std::next(values_.get().cbegin(), pos)->second);
+ }
+
+ size_t size() const override { return values_.get().size(); }
+
+ private:
+ std::reference_wrapper<const std::map<int32_t, V>> values_;
+};
+
+template <typename V>
+Result<Scalar> FromOptionalMap(const std::map<int32_t, V>& values) {
+ if (values.empty()) {
+ return std::monostate{};
+ }
+ return std::make_shared<IntMapLike<V>>(values);
+}
+
+template <typename T>
+Result<Scalar> FromOptionalVector(const std::vector<T>& values) {
+ if (values.empty()) {
+ return std::monostate{};
+ }
+ return std::make_shared<VectorArrayLike<T>>(values);
+}
+
+Result<Scalar> FromOptionalBytes(const std::vector<uint8_t>& value) {
+ if (value.empty()) {
+ return std::monostate{};
+ }
+ return ToView(value);
+}
+
} // namespace
Result<Scalar> PartitionFieldSummaryStructLike::GetField(size_t pos) const {
@@ -134,4 +246,65 @@ std::unique_ptr<StructLike> FromManifestFile(const
ManifestFile& file) {
return std::make_unique<ManifestFileStructLike>(file);
}
+Result<Scalar> DataFileStructLike::GetField(size_t pos) const {
+ if (pos >= num_fields()) {
+ return InvalidArgument("Invalid data file field index: {}", pos);
+ }
+
+ const auto& data_file = data_file_.get();
+ switch (static_cast<DataFileFieldPosition>(pos)) {
+ case DataFileFieldPosition::kContent:
+ return static_cast<int32_t>(data_file.content);
+ case DataFileFieldPosition::kFilePath:
+ return ToView(data_file.file_path);
+ case DataFileFieldPosition::kFileFormat:
+ return ToString(data_file.file_format);
+ case DataFileFieldPosition::kSpecId:
+ return FromOptional(data_file.partition_spec_id);
+ case DataFileFieldPosition::kPartition: {
+ partition_ = std::make_shared<PartitionValues>(data_file.partition);
+ return partition_;
+ }
+ case DataFileFieldPosition::kRecordCount:
+ return data_file.record_count;
+ case DataFileFieldPosition::kFileSize:
+ return data_file.file_size_in_bytes;
+ case DataFileFieldPosition::kColumnSizes:
+ return FromOptionalMap(data_file.column_sizes);
+ case DataFileFieldPosition::kValueCounts:
+ return FromOptionalMap(data_file.value_counts);
+ case DataFileFieldPosition::kNullValueCounts:
+ return FromOptionalMap(data_file.null_value_counts);
+ case DataFileFieldPosition::kNanValueCounts:
+ return FromOptionalMap(data_file.nan_value_counts);
+ case DataFileFieldPosition::kLowerBounds:
+ return FromOptionalMap(data_file.lower_bounds);
+ case DataFileFieldPosition::kUpperBounds:
+ return FromOptionalMap(data_file.upper_bounds);
+ case DataFileFieldPosition::kKeyMetadata:
+ return FromOptionalBytes(data_file.key_metadata);
+ case DataFileFieldPosition::kSplitOffsets:
+ return FromOptionalVector(data_file.split_offsets);
+ case DataFileFieldPosition::kEqualityIds:
+ return FromOptionalVector(data_file.equality_ids);
+ case DataFileFieldPosition::kSortOrderId:
+ return FromOptional(data_file.sort_order_id);
+ case DataFileFieldPosition::kFirstRowId:
+ return FromOptional(data_file.first_row_id);
+ case DataFileFieldPosition::kReferencedDataFile:
+ return FromOptionalString(data_file.referenced_data_file);
+ case DataFileFieldPosition::kContentOffset:
+ return FromOptional(data_file.content_offset);
+ case DataFileFieldPosition::kContentSize:
+ return FromOptional(data_file.content_size_in_bytes);
+ case DataFileFieldPosition::kNextUnusedId:
+ return InvalidArgument("Invalid data file field index: {}", pos);
+ }
+ return InvalidArgument("Invalid data file field index: {}", pos);
+}
+
+size_t DataFileStructLike::num_fields() const {
+ return static_cast<size_t>(DataFileFieldPosition::kNextUnusedId);
+}
+
} // namespace iceberg
diff --git a/src/iceberg/row/manifest_wrapper.h
b/src/iceberg/row/manifest_wrapper.h
index bc04c1e8..20c2165b 100644
--- a/src/iceberg/row/manifest_wrapper.h
+++ b/src/iceberg/row/manifest_wrapper.h
@@ -26,6 +26,7 @@
#include <functional>
#include "iceberg/iceberg_export.h"
+#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/manifest/manifest_list.h"
#include "iceberg/row/struct_like.h"
@@ -97,4 +98,24 @@ class ICEBERG_EXPORT ManifestFileStructLike : public
StructLike {
mutable std::shared_ptr<PartitionFieldSummaryArrayLike> summaries_;
};
+/// \brief StructLike wrapper for DataFile metadata.
+class ICEBERG_EXPORT DataFileStructLike : public StructLike {
+ public:
+ explicit DataFileStructLike(const DataFile& file) : data_file_(file) {}
+ ~DataFileStructLike() override = default;
+
+ DataFileStructLike(const DataFileStructLike&) = delete;
+ DataFileStructLike& operator=(const DataFileStructLike&) = delete;
+
+ Result<Scalar> GetField(size_t pos) const override;
+
+ size_t num_fields() const override;
+
+ void Reset(const DataFile& file) { data_file_ = std::cref(file); }
+
+ private:
+ std::reference_wrapper<const DataFile> data_file_;
+ mutable std::shared_ptr<StructLike> partition_;
+};
+
} // namespace iceberg
diff --git a/src/iceberg/test/manifest_group_test.cc
b/src/iceberg/test/manifest_group_test.cc
index 017f9803..70e2cea9 100644
--- a/src/iceberg/test/manifest_group_test.cc
+++ b/src/iceberg/test/manifest_group_test.cc
@@ -76,13 +76,14 @@ class ManifestGroupTest : public
testing::TestWithParam<int8_t> {
std::shared_ptr<DataFile> MakeDataFile(const std::string& path,
const PartitionValues& partition,
- int32_t spec_id, int64_t record_count
= 1) {
+ int32_t spec_id, int64_t record_count
= 1,
+ int64_t file_size_in_bytes = 10) {
return std::make_shared<DataFile>(DataFile{
.file_path = path,
.file_format = FileFormatType::kParquet,
.partition = partition,
.record_count = record_count,
- .file_size_in_bytes = 10,
+ .file_size_in_bytes = file_size_in_bytes,
.sort_order_id = 0,
.partition_spec_id = spec_id,
});
@@ -404,6 +405,187 @@ TEST_P(ManifestGroupTest, CustomManifestEntriesFilter) {
"/path/to/data3.parquet"));
}
+TEST_P(ManifestGroupTest, FilterFilesByRecordCount) {
+ auto version = GetParam();
+
+ constexpr int64_t kSnapshotId = 1000L;
+ const auto part_value = PartitionValues({Literal::Int(0)});
+
+ std::vector<ManifestEntry> data_entries{
+ MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+ MakeDataFile("/path/to/small.parquet", part_value,
+ partitioned_spec_->spec_id(),
/*record_count=*/5)),
+ MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+ MakeDataFile("/path/to/boundary.parquet", part_value,
+ partitioned_spec_->spec_id(),
/*record_count=*/10)),
+ MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+ MakeDataFile("/path/to/large.parquet", part_value,
+ partitioned_spec_->spec_id(),
/*record_count=*/15))};
+ auto data_manifest =
+ WriteDataManifest(version, kSnapshotId, std::move(data_entries),
partitioned_spec_);
+
+ std::vector<ManifestFile> manifests = {data_manifest};
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto group,
+ ManifestGroup::Make(file_io_, schema_, GetSpecsById(),
std::move(manifests)));
+ group->FilterFiles(Expressions::GreaterThanOrEqual("record_count",
Literal::Long(10)));
+
+ ICEBERG_UNWRAP_OR_FAIL(auto entries, group->Entries());
+ EXPECT_THAT(GetEntryPaths(entries),
+ testing::UnorderedElementsAre("/path/to/boundary.parquet",
+ "/path/to/large.parquet"));
+}
+
+TEST_P(ManifestGroupTest, FilterFilesRejectsPartitionMetadata) {
+ auto version = GetParam();
+
+ constexpr int64_t kSnapshotId = 1000L;
+ const auto part_value = PartitionValues({Literal::Int(0)});
+
+ std::vector<ManifestEntry> data_entries{MakeEntry(
+ ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+ MakeDataFile("/path/to/data.parquet", part_value,
partitioned_spec_->spec_id()))};
+ auto data_manifest =
+ WriteDataManifest(version, kSnapshotId, std::move(data_entries),
partitioned_spec_);
+
+ std::vector<ManifestFile> manifests = {data_manifest};
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto group,
+ ManifestGroup::Make(file_io_, schema_, GetSpecsById(),
std::move(manifests)));
+ group->FilterFiles(Expressions::Equal("partition.data_bucket_16_2",
Literal::Int(1)));
+
+ auto result = group->Entries();
+ EXPECT_THAT(result, IsError(ErrorKind::kInvalidExpression));
+ EXPECT_THAT(result, HasErrorMessage("Cannot find field
'partition.data_bucket_16_2'"));
+}
+
+TEST_P(ManifestGroupTest, FilterFilesRejectsPartitionMetadataWhenEmpty) {
+ std::vector<ManifestFile> manifests;
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto group,
+ ManifestGroup::Make(file_io_, schema_, GetSpecsById(),
std::move(manifests)));
+ group->FilterFiles(Expressions::Equal("partition.data_bucket_16_2",
Literal::Int(1)));
+
+ auto result = group->Entries();
+ EXPECT_THAT(result, IsError(ErrorKind::kInvalidExpression));
+ EXPECT_THAT(result, HasErrorMessage("Cannot find field
'partition.data_bucket_16_2'"));
+}
+
+TEST_P(ManifestGroupTest,
FilterFilesRejectsPartitionMetadataBeforeManifestPruning) {
+ auto version = GetParam();
+
+ constexpr int64_t kSnapshotId = 1000L;
+ const auto part_value = PartitionValues({Literal::Int(0)});
+
+ std::vector<ManifestEntry> data_entries{MakeEntry(
+ ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+ MakeDataFile("/path/to/data.parquet", part_value,
partitioned_spec_->spec_id()))};
+ auto data_manifest =
+ WriteDataManifest(version, kSnapshotId, std::move(data_entries),
partitioned_spec_);
+
+ std::vector<ManifestFile> manifests = {data_manifest};
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto group,
+ ManifestGroup::Make(file_io_, schema_, GetSpecsById(),
std::move(manifests)));
+ group->FilterPartitions(Expressions::Equal("data_bucket_16_2",
Literal::Int(1)))
+ .FilterFiles(Expressions::Equal("partition.data_bucket_16_2",
Literal::Int(1)));
+
+ auto result = group->Entries();
+ EXPECT_THAT(result, IsError(ErrorKind::kInvalidExpression));
+ EXPECT_THAT(result, HasErrorMessage("Cannot find field
'partition.data_bucket_16_2'"));
+}
+
+TEST_P(ManifestGroupTest, FilterFilesReadsFilteredColumnsWhenSelected) {
+ auto version = GetParam();
+
+ constexpr int64_t kSnapshotId = 1000L;
+ const auto part_value = PartitionValues({Literal::Int(0)});
+
+ std::vector<ManifestEntry> data_entries{
+ MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+ MakeDataFile("/path/to/too-small.parquet", part_value,
+ partitioned_spec_->spec_id(), /*record_count=*/1,
+ /*file_size_in_bytes=*/5)),
+ MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+ MakeDataFile("/path/to/matching.parquet", part_value,
+ partitioned_spec_->spec_id(), /*record_count=*/1,
+ /*file_size_in_bytes=*/20))};
+ auto data_manifest =
+ WriteDataManifest(version, kSnapshotId, std::move(data_entries),
partitioned_spec_);
+
+ std::vector<ManifestFile> manifests = {data_manifest};
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto group,
+ ManifestGroup::Make(file_io_, schema_, GetSpecsById(),
std::move(manifests)));
+ group->Select({"file_path"})
+ .FilterFiles(Expressions::GreaterThan("file_size_in_bytes",
Literal::Long(10)));
+
+ ICEBERG_UNWRAP_OR_FAIL(auto entries, group->Entries());
+ EXPECT_THAT(GetEntryPaths(entries),
testing::ElementsAre("/path/to/matching.parquet"));
+}
+
+TEST_P(ManifestGroupTest,
FilterFilesHonorsCaseInsensitiveMatchingWhenSelected) {
+ auto version = GetParam();
+
+ constexpr int64_t kSnapshotId = 1000L;
+ const auto part_value = PartitionValues({Literal::Int(0)});
+
+ std::vector<ManifestEntry> data_entries{
+ MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+ MakeDataFile("/path/to/small.parquet", part_value,
+ partitioned_spec_->spec_id(),
/*record_count=*/5)),
+ MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+ MakeDataFile("/path/to/large.parquet", part_value,
+ partitioned_spec_->spec_id(),
/*record_count=*/15))};
+ auto data_manifest =
+ WriteDataManifest(version, kSnapshotId, std::move(data_entries),
partitioned_spec_);
+
+ std::vector<ManifestFile> manifests = {data_manifest};
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto group,
+ ManifestGroup::Make(file_io_, schema_, GetSpecsById(),
std::move(manifests)));
+ group->CaseSensitive(false)
+ .Select({"FILE_PATH"})
+ .FilterFiles(Expressions::GreaterThanOrEqual("RECORD_COUNT",
Literal::Long(10)));
+
+ ICEBERG_UNWRAP_OR_FAIL(auto entries, group->Entries());
+ EXPECT_THAT(GetEntryPaths(entries),
testing::ElementsAre("/path/to/large.parquet"));
+}
+
+TEST_P(ManifestGroupTest, FilterFilesBySpecIdWhenSelected) {
+ auto version = GetParam();
+
+ constexpr int64_t kSnapshotId = 1000L;
+ const auto unpartitioned_value = PartitionValues(std::vector<Literal>{});
+ const auto partitioned_value = PartitionValues({Literal::Int(1)});
+
+ std::vector<ManifestEntry> unpartitioned_entries{
+ MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+ MakeDataFile("/path/to/unpartitioned.parquet",
unpartitioned_value,
+ unpartitioned_spec_->spec_id()))};
+ auto unpartitioned_manifest = WriteDataManifest(
+ version, kSnapshotId, std::move(unpartitioned_entries),
unpartitioned_spec_);
+
+ std::vector<ManifestEntry> partitioned_entries{
+ MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+ MakeDataFile("/path/to/partitioned.parquet", partitioned_value,
+ partitioned_spec_->spec_id()))};
+ auto partitioned_manifest = WriteDataManifest(
+ version, kSnapshotId, std::move(partitioned_entries), partitioned_spec_);
+
+ std::vector<ManifestFile> manifests = {unpartitioned_manifest,
partitioned_manifest};
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto group,
+ ManifestGroup::Make(file_io_, schema_, GetSpecsById(),
std::move(manifests)));
+ group->Select({"file_path"})
+ .FilterFiles(
+ Expressions::Equal("spec_id",
Literal::Int(partitioned_spec_->spec_id())));
+
+ ICEBERG_UNWRAP_OR_FAIL(auto entries, group->Entries());
+ EXPECT_THAT(GetEntryPaths(entries),
+ testing::ElementsAre("/path/to/partitioned.parquet"));
+}
+
TEST_P(ManifestGroupTest, EmptyManifestGroup) {
std::vector<ManifestFile> manifests;
ICEBERG_UNWRAP_OR_FAIL(