This is an automated email from the ASF dual-hosted git repository.

wgtmac pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new fa368354 fix: evaluate ManifestGroup file filters (#664)
fa368354 is described below

commit fa3683546f85dd46fdbb05deb550b6c6c758e69f
Author: Minh Vu <[email protected]>
AuthorDate: Sun May 24 08:49:02 2026 +0200

    fix: evaluate ManifestGroup file filters (#664)
    
    ## Summary
    
    Fixes #663.
    
    This implements `ManifestGroup::FilterFiles()` instead of accepting the
    filter as a silent no-op. The change adds a `DataFile` `StructLike`
    wrapper, binds a file-level evaluator against file metadata, and applies
    it to each manifest entry before returning it.
    
    The evaluator schema is aligned with Java `ManifestGroup`: concrete
    partition fields are not exposed under `partition`, so callers should
    use `FilterData(...)` for logical data predicates. Supported file
    metadata includes `spec_id`, and unsupported concrete partition filters
    fail during binding rather than being ignored.
    
    The reader projection now includes file-filter referenced metadata
    columns when callers use `Select(...)`. Empty container-backed optional
    metadata is exposed as null so projected or missing metadata does not
    look present to the evaluator.
    
    ## Tests
    
    - `uvx pre-commit run --files src/iceberg/manifest/manifest_group.cc
    src/iceberg/manifest/manifest_entry.h
    src/iceberg/row/manifest_wrapper.cc src/iceberg/row/manifest_wrapper.h
    src/iceberg/test/manifest_group_test.cc`
    - `cmake --build build --target manifest_test -j 8`
    - `./build/src/iceberg/test/manifest_test
    --gtest_filter='ManifestGroupVersions/ManifestGroupTest.FilterFiles*'`
    - `./build/src/iceberg/test/manifest_test`
---
 src/iceberg/manifest/manifest_entry.h   |   4 +
 src/iceberg/manifest/manifest_group.cc  |  87 +++++++++++++--
 src/iceberg/row/manifest_wrapper.cc     | 173 +++++++++++++++++++++++++++++
 src/iceberg/row/manifest_wrapper.h      |  21 ++++
 src/iceberg/test/manifest_group_test.cc | 186 +++++++++++++++++++++++++++++++-
 5 files changed, 463 insertions(+), 8 deletions(-)

diff --git a/src/iceberg/manifest/manifest_entry.h 
b/src/iceberg/manifest/manifest_entry.h
index c1f81d0a..17c5388b 100644
--- a/src/iceberg/manifest/manifest_entry.h
+++ b/src/iceberg/manifest/manifest_entry.h
@@ -193,6 +193,10 @@ struct ICEBERG_EXPORT DataFile {
       SchemaField::MakeRequired(kFileFormatFieldId, "file_format", string(),
                                 "File format name: avro, orc, or parquet");
 
+  static constexpr int32_t kSpecIdFieldId = 141;
+  inline static const SchemaField kSpecId =
+      SchemaField::MakeOptional(kSpecIdFieldId, "spec_id", int32(), "Partition 
spec ID");
+
   static constexpr int32_t kPartitionFieldId = 102;
   inline static const std::string kPartitionField = "partition";
   inline static const std::string kPartitionDoc =
diff --git a/src/iceberg/manifest/manifest_group.cc 
b/src/iceberg/manifest/manifest_group.cc
index 8af717b2..61bb57da 100644
--- a/src/iceberg/manifest/manifest_group.cc
+++ b/src/iceberg/manifest/manifest_group.cc
@@ -19,8 +19,14 @@
 
 #include "iceberg/manifest/manifest_group.h"
 
+#include <algorithm>
+#include <memory>
+#include <string>
+#include <unordered_set>
 #include <utility>
+#include <vector>
 
+#include "iceberg/expression/binder.h"
 #include "iceberg/expression/evaluator.h"
 #include "iceberg/expression/expression.h"
 #include "iceberg/expression/manifest_evaluator.h"
@@ -29,14 +35,47 @@
 #include "iceberg/file_io.h"
 #include "iceberg/manifest/manifest_reader.h"
 #include "iceberg/partition_spec.h"
+#include "iceberg/row/manifest_wrapper.h"
 #include "iceberg/schema.h"
 #include "iceberg/table_scan.h"
+#include "iceberg/type.h"
 #include "iceberg/util/checked_cast.h"
 #include "iceberg/util/content_file_util.h"
 #include "iceberg/util/macros.h"
 
 namespace iceberg {
 
+namespace {
+
+std::shared_ptr<Schema> DataFileFilterSchema() {
+  auto empty_partition_type = 
std::make_shared<StructType>(std::vector<SchemaField>{});
+  return std::make_shared<Schema>(std::vector<SchemaField>{
+      DataFile::kContent,
+      DataFile::kFilePath,
+      DataFile::kFileFormat,
+      DataFile::kSpecId,
+      SchemaField::MakeRequired(DataFile::kPartitionFieldId, 
DataFile::kPartitionField,
+                                std::move(empty_partition_type), 
DataFile::kPartitionDoc),
+      DataFile::kRecordCount,
+      DataFile::kFileSize,
+      DataFile::kColumnSizes,
+      DataFile::kValueCounts,
+      DataFile::kNullValueCounts,
+      DataFile::kNanValueCounts,
+      DataFile::kLowerBounds,
+      DataFile::kUpperBounds,
+      DataFile::kKeyMetadata,
+      DataFile::kSplitOffsets,
+      DataFile::kEqualityIds,
+      DataFile::kSortOrderId,
+      DataFile::kFirstRowId,
+      DataFile::kReferencedDataFile,
+      DataFile::kContentOffset,
+      DataFile::kContentSize});
+}
+
+}  // namespace
+
 Result<std::unique_ptr<ManifestGroup>> ManifestGroup::Make(
     std::shared_ptr<FileIO> io, std::shared_ptr<Schema> schema,
     std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id,
@@ -265,10 +304,39 @@ Result<std::unique_ptr<ManifestReader>> 
ManifestGroup::MakeReader(
   ICEBERG_ASSIGN_OR_RAISE(auto reader,
                           ManifestReader::Make(manifest, io_, schema_, 
specs_by_id_));
 
+  auto columns = columns_;
+  if (file_filter_ && file_filter_->op() != Expression::Operation::kTrue &&
+      !columns.empty() &&
+      std::ranges::find(columns, Schema::kAllColumns) == columns.end()) {
+    auto data_file_schema = DataFileFilterSchema();
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto bound_file_filter,
+        Binder::Bind(*data_file_schema, file_filter_, case_sensitive_));
+    ICEBERG_ASSIGN_OR_RAISE(auto referenced_field_ids,
+                            
ReferenceVisitor::GetReferencedFieldIds(bound_file_filter));
+
+    std::unordered_set<std::string> selected_columns(columns.cbegin(), 
columns.cend());
+    for (const auto field_id : referenced_field_ids) {
+      if (field_id == DataFile::kSpecIdFieldId) {
+        continue;
+      }
+      ICEBERG_ASSIGN_OR_RAISE(auto column_name,
+                              data_file_schema->FindColumnNameById(field_id));
+      if (column_name.has_value()) {
+        std::string column_name_str(column_name.value());
+        if (selected_columns.contains(column_name_str)) {
+          continue;
+        }
+        columns.push_back(std::move(column_name_str));
+        selected_columns.insert(columns.back());
+      }
+    }
+  }
+
   reader->FilterRows(data_filter_)
       .FilterPartitions(partition_filter_)
       .CaseSensitive(case_sensitive_)
-      .Select(columns_);
+      .Select(std::move(columns));
 
   return reader;
 }
@@ -299,10 +367,13 @@ ManifestGroup::ReadEntries() {
     return eval_cache[spec_id].get();
   };
 
+  const bool has_file_filter =
+      file_filter_ && file_filter_->op() != Expression::Operation::kTrue;
   std::unique_ptr<Evaluator> data_file_evaluator;
-  if (file_filter_ && file_filter_->op() != Expression::Operation::kTrue) {
-    // TODO(gangwu): create an Evaluator on the DataFile schema with empty
-    // partition type
+  if (has_file_filter) {
+    ICEBERG_ASSIGN_OR_RAISE(
+        data_file_evaluator,
+        Evaluator::Make(*DataFileFilterSchema(), file_filter_, 
case_sensitive_));
   }
 
   std::unordered_map<int32_t, std::vector<ManifestEntry>> result;
@@ -343,8 +414,12 @@ ManifestGroup::ReadEntries() {
       }
 
       if (data_file_evaluator != nullptr) {
-        // TODO(gangwu): implement data_file_evaluator to evaluate StructLike 
on
-        // top of entry.data_file
+        DataFileStructLike data_file(*entry.data_file);
+        ICEBERG_ASSIGN_OR_RAISE(bool should_match,
+                                data_file_evaluator->Evaluate(data_file));
+        if (!should_match) {
+          continue;
+        }
       }
 
       if (!manifest_entry_predicate_(entry)) {
diff --git a/src/iceberg/row/manifest_wrapper.cc 
b/src/iceberg/row/manifest_wrapper.cc
index 851f9e72..18be8223 100644
--- a/src/iceberg/row/manifest_wrapper.cc
+++ b/src/iceberg/row/manifest_wrapper.cc
@@ -19,18 +19,57 @@
 
 #include "iceberg/row/manifest_wrapper.h"
 
+#include <iterator>
+#include <map>
+#include <memory>
+#include <span>
+#include <type_traits>
+#include <vector>
+
 #include "iceberg/manifest/manifest_reader_internal.h"
 #include "iceberg/util/macros.h"
 
 namespace iceberg {
 
 namespace {
+
+enum class DataFileFieldPosition : size_t {
+  kContent = 0,
+  kFilePath = 1,
+  kFileFormat = 2,
+  kSpecId = 3,
+  kPartition = 4,
+  kRecordCount = 5,
+  kFileSize = 6,
+  kColumnSizes = 7,
+  kValueCounts = 8,
+  kNullValueCounts = 9,
+  kNanValueCounts = 10,
+  kLowerBounds = 11,
+  kUpperBounds = 12,
+  kKeyMetadata = 13,
+  kSplitOffsets = 14,
+  kEqualityIds = 15,
+  kSortOrderId = 16,
+  kFirstRowId = 17,
+  kReferencedDataFile = 18,
+  kContentOffset = 19,
+  kContentSize = 20,
+  kNextUnusedId = 21,
+};
+
 template <typename T>
   requires std::is_same_v<T, std::vector<uint8_t>> || std::is_same_v<T, 
std::string>
 std::string_view ToView(const T& value) {
   return {reinterpret_cast<const char*>(value.data()), value.size()};  // 
NOLINT
 }
 
+Scalar ToScalar(const int32_t value) { return value; }
+
+Scalar ToScalar(const int64_t value) { return value; }
+
+Scalar ToScalar(const std::vector<uint8_t>& value) { return ToView(value); }
+
 template <typename T>
 Result<Scalar> FromOptional(const std::optional<T>& value) {
   if (value.has_value()) {
@@ -39,6 +78,79 @@ Result<Scalar> FromOptional(const std::optional<T>& value) {
   return std::monostate{};
 }
 
+Result<Scalar> FromOptionalString(const std::optional<std::string>& value) {
+  if (value.has_value()) {
+    return ToView(value.value());
+  }
+  return std::monostate{};
+}
+
+template <typename T>
+class VectorArrayLike : public ArrayLike {
+ public:
+  explicit VectorArrayLike(std::span<const T> values) : values_(values) {}
+
+  Result<Scalar> GetElement(size_t pos) const override {
+    if (pos >= size()) {
+      return InvalidArgument("Invalid array index: {}", pos);
+    }
+    return ToScalar(values_[pos]);
+  }
+
+  size_t size() const override { return values_.size(); }
+
+ private:
+  std::span<const T> values_;
+};
+
+template <typename V>
+class IntMapLike : public MapLike {
+ public:
+  explicit IntMapLike(const std::map<int32_t, V>& values) : values_(values) {}
+
+  Result<Scalar> GetKey(size_t pos) const override {
+    if (pos >= size()) {
+      return InvalidArgument("Invalid map index: {}", pos);
+    }
+    return std::next(values_.get().cbegin(), pos)->first;
+  }
+
+  Result<Scalar> GetValue(size_t pos) const override {
+    if (pos >= size()) {
+      return InvalidArgument("Invalid map index: {}", pos);
+    }
+    return ToScalar(std::next(values_.get().cbegin(), pos)->second);
+  }
+
+  size_t size() const override { return values_.get().size(); }
+
+ private:
+  std::reference_wrapper<const std::map<int32_t, V>> values_;
+};
+
+template <typename V>
+Result<Scalar> FromOptionalMap(const std::map<int32_t, V>& values) {
+  if (values.empty()) {
+    return std::monostate{};
+  }
+  return std::make_shared<IntMapLike<V>>(values);
+}
+
+template <typename T>
+Result<Scalar> FromOptionalVector(const std::vector<T>& values) {
+  if (values.empty()) {
+    return std::monostate{};
+  }
+  return std::make_shared<VectorArrayLike<T>>(values);
+}
+
+Result<Scalar> FromOptionalBytes(const std::vector<uint8_t>& value) {
+  if (value.empty()) {
+    return std::monostate{};
+  }
+  return ToView(value);
+}
+
 }  // namespace
 
 Result<Scalar> PartitionFieldSummaryStructLike::GetField(size_t pos) const {
@@ -134,4 +246,65 @@ std::unique_ptr<StructLike> FromManifestFile(const 
ManifestFile& file) {
   return std::make_unique<ManifestFileStructLike>(file);
 }
 
+Result<Scalar> DataFileStructLike::GetField(size_t pos) const {
+  if (pos >= num_fields()) {
+    return InvalidArgument("Invalid data file field index: {}", pos);
+  }
+
+  const auto& data_file = data_file_.get();
+  switch (static_cast<DataFileFieldPosition>(pos)) {
+    case DataFileFieldPosition::kContent:
+      return static_cast<int32_t>(data_file.content);
+    case DataFileFieldPosition::kFilePath:
+      return ToView(data_file.file_path);
+    case DataFileFieldPosition::kFileFormat:
+      return ToString(data_file.file_format);
+    case DataFileFieldPosition::kSpecId:
+      return FromOptional(data_file.partition_spec_id);
+    case DataFileFieldPosition::kPartition: {
+      partition_ = std::make_shared<PartitionValues>(data_file.partition);
+      return partition_;
+    }
+    case DataFileFieldPosition::kRecordCount:
+      return data_file.record_count;
+    case DataFileFieldPosition::kFileSize:
+      return data_file.file_size_in_bytes;
+    case DataFileFieldPosition::kColumnSizes:
+      return FromOptionalMap(data_file.column_sizes);
+    case DataFileFieldPosition::kValueCounts:
+      return FromOptionalMap(data_file.value_counts);
+    case DataFileFieldPosition::kNullValueCounts:
+      return FromOptionalMap(data_file.null_value_counts);
+    case DataFileFieldPosition::kNanValueCounts:
+      return FromOptionalMap(data_file.nan_value_counts);
+    case DataFileFieldPosition::kLowerBounds:
+      return FromOptionalMap(data_file.lower_bounds);
+    case DataFileFieldPosition::kUpperBounds:
+      return FromOptionalMap(data_file.upper_bounds);
+    case DataFileFieldPosition::kKeyMetadata:
+      return FromOptionalBytes(data_file.key_metadata);
+    case DataFileFieldPosition::kSplitOffsets:
+      return FromOptionalVector(data_file.split_offsets);
+    case DataFileFieldPosition::kEqualityIds:
+      return FromOptionalVector(data_file.equality_ids);
+    case DataFileFieldPosition::kSortOrderId:
+      return FromOptional(data_file.sort_order_id);
+    case DataFileFieldPosition::kFirstRowId:
+      return FromOptional(data_file.first_row_id);
+    case DataFileFieldPosition::kReferencedDataFile:
+      return FromOptionalString(data_file.referenced_data_file);
+    case DataFileFieldPosition::kContentOffset:
+      return FromOptional(data_file.content_offset);
+    case DataFileFieldPosition::kContentSize:
+      return FromOptional(data_file.content_size_in_bytes);
+    case DataFileFieldPosition::kNextUnusedId:
+      return InvalidArgument("Invalid data file field index: {}", pos);
+  }
+  return InvalidArgument("Invalid data file field index: {}", pos);
+}
+
+size_t DataFileStructLike::num_fields() const {
+  return static_cast<size_t>(DataFileFieldPosition::kNextUnusedId);
+}
+
 }  // namespace iceberg
diff --git a/src/iceberg/row/manifest_wrapper.h 
b/src/iceberg/row/manifest_wrapper.h
index bc04c1e8..20c2165b 100644
--- a/src/iceberg/row/manifest_wrapper.h
+++ b/src/iceberg/row/manifest_wrapper.h
@@ -26,6 +26,7 @@
 #include <functional>
 
 #include "iceberg/iceberg_export.h"
+#include "iceberg/manifest/manifest_entry.h"
 #include "iceberg/manifest/manifest_list.h"
 #include "iceberg/row/struct_like.h"
 
@@ -97,4 +98,24 @@ class ICEBERG_EXPORT ManifestFileStructLike : public 
StructLike {
   mutable std::shared_ptr<PartitionFieldSummaryArrayLike> summaries_;
 };
 
+/// \brief StructLike wrapper for DataFile metadata.
+class ICEBERG_EXPORT DataFileStructLike : public StructLike {
+ public:
+  explicit DataFileStructLike(const DataFile& file) : data_file_(file) {}
+  ~DataFileStructLike() override = default;
+
+  DataFileStructLike(const DataFileStructLike&) = delete;
+  DataFileStructLike& operator=(const DataFileStructLike&) = delete;
+
+  Result<Scalar> GetField(size_t pos) const override;
+
+  size_t num_fields() const override;
+
+  void Reset(const DataFile& file) { data_file_ = std::cref(file); }
+
+ private:
+  std::reference_wrapper<const DataFile> data_file_;
+  mutable std::shared_ptr<StructLike> partition_;
+};
+
 }  // namespace iceberg
diff --git a/src/iceberg/test/manifest_group_test.cc 
b/src/iceberg/test/manifest_group_test.cc
index 017f9803..70e2cea9 100644
--- a/src/iceberg/test/manifest_group_test.cc
+++ b/src/iceberg/test/manifest_group_test.cc
@@ -76,13 +76,14 @@ class ManifestGroupTest : public 
testing::TestWithParam<int8_t> {
 
   std::shared_ptr<DataFile> MakeDataFile(const std::string& path,
                                          const PartitionValues& partition,
-                                         int32_t spec_id, int64_t record_count 
= 1) {
+                                         int32_t spec_id, int64_t record_count 
= 1,
+                                         int64_t file_size_in_bytes = 10) {
     return std::make_shared<DataFile>(DataFile{
         .file_path = path,
         .file_format = FileFormatType::kParquet,
         .partition = partition,
         .record_count = record_count,
-        .file_size_in_bytes = 10,
+        .file_size_in_bytes = file_size_in_bytes,
         .sort_order_id = 0,
         .partition_spec_id = spec_id,
     });
@@ -404,6 +405,187 @@ TEST_P(ManifestGroupTest, CustomManifestEntriesFilter) {
                                                              
"/path/to/data3.parquet"));
 }
 
+TEST_P(ManifestGroupTest, FilterFilesByRecordCount) {
+  auto version = GetParam();
+
+  constexpr int64_t kSnapshotId = 1000L;
+  const auto part_value = PartitionValues({Literal::Int(0)});
+
+  std::vector<ManifestEntry> data_entries{
+      MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+                MakeDataFile("/path/to/small.parquet", part_value,
+                             partitioned_spec_->spec_id(), 
/*record_count=*/5)),
+      MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+                MakeDataFile("/path/to/boundary.parquet", part_value,
+                             partitioned_spec_->spec_id(), 
/*record_count=*/10)),
+      MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+                MakeDataFile("/path/to/large.parquet", part_value,
+                             partitioned_spec_->spec_id(), 
/*record_count=*/15))};
+  auto data_manifest =
+      WriteDataManifest(version, kSnapshotId, std::move(data_entries), 
partitioned_spec_);
+
+  std::vector<ManifestFile> manifests = {data_manifest};
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto group,
+      ManifestGroup::Make(file_io_, schema_, GetSpecsById(), 
std::move(manifests)));
+  group->FilterFiles(Expressions::GreaterThanOrEqual("record_count", 
Literal::Long(10)));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto entries, group->Entries());
+  EXPECT_THAT(GetEntryPaths(entries),
+              testing::UnorderedElementsAre("/path/to/boundary.parquet",
+                                            "/path/to/large.parquet"));
+}
+
+TEST_P(ManifestGroupTest, FilterFilesRejectsPartitionMetadata) {
+  auto version = GetParam();
+
+  constexpr int64_t kSnapshotId = 1000L;
+  const auto part_value = PartitionValues({Literal::Int(0)});
+
+  std::vector<ManifestEntry> data_entries{MakeEntry(
+      ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+      MakeDataFile("/path/to/data.parquet", part_value, 
partitioned_spec_->spec_id()))};
+  auto data_manifest =
+      WriteDataManifest(version, kSnapshotId, std::move(data_entries), 
partitioned_spec_);
+
+  std::vector<ManifestFile> manifests = {data_manifest};
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto group,
+      ManifestGroup::Make(file_io_, schema_, GetSpecsById(), 
std::move(manifests)));
+  group->FilterFiles(Expressions::Equal("partition.data_bucket_16_2", 
Literal::Int(1)));
+
+  auto result = group->Entries();
+  EXPECT_THAT(result, IsError(ErrorKind::kInvalidExpression));
+  EXPECT_THAT(result, HasErrorMessage("Cannot find field 
'partition.data_bucket_16_2'"));
+}
+
+TEST_P(ManifestGroupTest, FilterFilesRejectsPartitionMetadataWhenEmpty) {
+  std::vector<ManifestFile> manifests;
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto group,
+      ManifestGroup::Make(file_io_, schema_, GetSpecsById(), 
std::move(manifests)));
+  group->FilterFiles(Expressions::Equal("partition.data_bucket_16_2", 
Literal::Int(1)));
+
+  auto result = group->Entries();
+  EXPECT_THAT(result, IsError(ErrorKind::kInvalidExpression));
+  EXPECT_THAT(result, HasErrorMessage("Cannot find field 
'partition.data_bucket_16_2'"));
+}
+
+TEST_P(ManifestGroupTest, 
FilterFilesRejectsPartitionMetadataBeforeManifestPruning) {
+  auto version = GetParam();
+
+  constexpr int64_t kSnapshotId = 1000L;
+  const auto part_value = PartitionValues({Literal::Int(0)});
+
+  std::vector<ManifestEntry> data_entries{MakeEntry(
+      ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+      MakeDataFile("/path/to/data.parquet", part_value, 
partitioned_spec_->spec_id()))};
+  auto data_manifest =
+      WriteDataManifest(version, kSnapshotId, std::move(data_entries), 
partitioned_spec_);
+
+  std::vector<ManifestFile> manifests = {data_manifest};
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto group,
+      ManifestGroup::Make(file_io_, schema_, GetSpecsById(), 
std::move(manifests)));
+  group->FilterPartitions(Expressions::Equal("data_bucket_16_2", 
Literal::Int(1)))
+      .FilterFiles(Expressions::Equal("partition.data_bucket_16_2", 
Literal::Int(1)));
+
+  auto result = group->Entries();
+  EXPECT_THAT(result, IsError(ErrorKind::kInvalidExpression));
+  EXPECT_THAT(result, HasErrorMessage("Cannot find field 
'partition.data_bucket_16_2'"));
+}
+
+TEST_P(ManifestGroupTest, FilterFilesReadsFilteredColumnsWhenSelected) {
+  auto version = GetParam();
+
+  constexpr int64_t kSnapshotId = 1000L;
+  const auto part_value = PartitionValues({Literal::Int(0)});
+
+  std::vector<ManifestEntry> data_entries{
+      MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+                MakeDataFile("/path/to/too-small.parquet", part_value,
+                             partitioned_spec_->spec_id(), /*record_count=*/1,
+                             /*file_size_in_bytes=*/5)),
+      MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+                MakeDataFile("/path/to/matching.parquet", part_value,
+                             partitioned_spec_->spec_id(), /*record_count=*/1,
+                             /*file_size_in_bytes=*/20))};
+  auto data_manifest =
+      WriteDataManifest(version, kSnapshotId, std::move(data_entries), 
partitioned_spec_);
+
+  std::vector<ManifestFile> manifests = {data_manifest};
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto group,
+      ManifestGroup::Make(file_io_, schema_, GetSpecsById(), 
std::move(manifests)));
+  group->Select({"file_path"})
+      .FilterFiles(Expressions::GreaterThan("file_size_in_bytes", 
Literal::Long(10)));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto entries, group->Entries());
+  EXPECT_THAT(GetEntryPaths(entries), 
testing::ElementsAre("/path/to/matching.parquet"));
+}
+
+TEST_P(ManifestGroupTest, 
FilterFilesHonorsCaseInsensitiveMatchingWhenSelected) {
+  auto version = GetParam();
+
+  constexpr int64_t kSnapshotId = 1000L;
+  const auto part_value = PartitionValues({Literal::Int(0)});
+
+  std::vector<ManifestEntry> data_entries{
+      MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+                MakeDataFile("/path/to/small.parquet", part_value,
+                             partitioned_spec_->spec_id(), 
/*record_count=*/5)),
+      MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+                MakeDataFile("/path/to/large.parquet", part_value,
+                             partitioned_spec_->spec_id(), 
/*record_count=*/15))};
+  auto data_manifest =
+      WriteDataManifest(version, kSnapshotId, std::move(data_entries), 
partitioned_spec_);
+
+  std::vector<ManifestFile> manifests = {data_manifest};
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto group,
+      ManifestGroup::Make(file_io_, schema_, GetSpecsById(), 
std::move(manifests)));
+  group->CaseSensitive(false)
+      .Select({"FILE_PATH"})
+      .FilterFiles(Expressions::GreaterThanOrEqual("RECORD_COUNT", 
Literal::Long(10)));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto entries, group->Entries());
+  EXPECT_THAT(GetEntryPaths(entries), 
testing::ElementsAre("/path/to/large.parquet"));
+}
+
+TEST_P(ManifestGroupTest, FilterFilesBySpecIdWhenSelected) {
+  auto version = GetParam();
+
+  constexpr int64_t kSnapshotId = 1000L;
+  const auto unpartitioned_value = PartitionValues(std::vector<Literal>{});
+  const auto partitioned_value = PartitionValues({Literal::Int(1)});
+
+  std::vector<ManifestEntry> unpartitioned_entries{
+      MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+                MakeDataFile("/path/to/unpartitioned.parquet", 
unpartitioned_value,
+                             unpartitioned_spec_->spec_id()))};
+  auto unpartitioned_manifest = WriteDataManifest(
+      version, kSnapshotId, std::move(unpartitioned_entries), 
unpartitioned_spec_);
+
+  std::vector<ManifestEntry> partitioned_entries{
+      MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+                MakeDataFile("/path/to/partitioned.parquet", partitioned_value,
+                             partitioned_spec_->spec_id()))};
+  auto partitioned_manifest = WriteDataManifest(
+      version, kSnapshotId, std::move(partitioned_entries), partitioned_spec_);
+
+  std::vector<ManifestFile> manifests = {unpartitioned_manifest, 
partitioned_manifest};
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto group,
+      ManifestGroup::Make(file_io_, schema_, GetSpecsById(), 
std::move(manifests)));
+  group->Select({"file_path"})
+      .FilterFiles(
+          Expressions::Equal("spec_id", 
Literal::Int(partitioned_spec_->spec_id())));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto entries, group->Entries());
+  EXPECT_THAT(GetEntryPaths(entries),
+              testing::ElementsAre("/path/to/partitioned.parquet"));
+}
+
 TEST_P(ManifestGroupTest, EmptyManifestGroup) {
   std::vector<ManifestFile> manifests;
   ICEBERG_UNWRAP_OR_FAIL(

Reply via email to