This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new c426aff4 test: add test case for table scan planning (#492)
c426aff4 is described below

commit c426aff4ffd01003a592dce48c5faa0696afcd3e
Author: Gang Wu <[email protected]>
AuthorDate: Mon Jan 26 22:53:35 2026 +0800

    test: add test case for table scan planning (#492)
---
 src/iceberg/table_scan.cc           |   3 +
 src/iceberg/table_scan.h            |   2 +-
 src/iceberg/test/CMakeLists.txt     |   6 +-
 src/iceberg/test/table_scan_test.cc | 671 ++++++++++++++++++++++++++++++++++++
 4 files changed, 680 insertions(+), 2 deletions(-)

diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc
index 05d44d90..4992b18d 100644
--- a/src/iceberg/table_scan.cc
+++ b/src/iceberg/table_scan.cc
@@ -383,6 +383,9 @@ const std::shared_ptr<TableMetadata>& TableScan::metadata() 
const { return metad
 Result<std::shared_ptr<Snapshot>> TableScan::snapshot() const {
   auto snapshot_id = context_.snapshot_id ? context_.snapshot_id.value()
                                           : metadata_->current_snapshot_id;
+  if (snapshot_id == kInvalidSnapshotId) {
+    return std::shared_ptr<Snapshot>{nullptr};
+  }
   return metadata_->SnapshotById(snapshot_id);
 }
 
diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h
index cc26830f..aa225ff8 100644
--- a/src/iceberg/table_scan.h
+++ b/src/iceberg/table_scan.h
@@ -275,7 +275,7 @@ class ICEBERG_EXPORT TableScan {
   /// \brief Returns the table metadata being scanned.
   const std::shared_ptr<TableMetadata>& metadata() const;
 
-  /// \brief Returns the snapshot to scan.
+  /// \brief Returns the snapshot to scan. If there is no snapshot, returns 
nullptr.
   Result<std::shared_ptr<Snapshot>> snapshot() const;
 
   /// \brief Returns the projected schema for the scan.
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index 1bd2fd6a..5243d9b7 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -167,7 +167,11 @@ if(ICEBERG_BUILD_BUNDLE)
                    parquet_schema_test.cc
                    parquet_test.cc)
 
-  add_iceberg_test(scan_test USE_BUNDLE SOURCES file_scan_task_test.cc)
+  add_iceberg_test(scan_test
+                   USE_BUNDLE
+                   SOURCES
+                   file_scan_task_test.cc
+                   table_scan_test.cc)
 
   add_iceberg_test(table_update_test
                    USE_BUNDLE
diff --git a/src/iceberg/test/table_scan_test.cc 
b/src/iceberg/test/table_scan_test.cc
new file mode 100644
index 00000000..b496606c
--- /dev/null
+++ b/src/iceberg/test/table_scan_test.cc
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/table_scan.h"
+
+#include <chrono>
+#include <format>
+#include <memory>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/avro/avro_register.h"
+#include "iceberg/expression/expressions.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/schema.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/transform.h"
+#include "iceberg/type.h"
+#include "iceberg/util/timepoint.h"
+
+namespace iceberg {
+
+class TableScanTest : public testing::TestWithParam<int> {
+ protected:
+  void SetUp() override {
+    avro::RegisterAll();
+
+    file_io_ = arrow::MakeMockFileIO();
+    schema_ = std::make_shared<Schema>(std::vector<SchemaField>{
+        SchemaField::MakeRequired(/*field_id=*/1, "id", int32()),
+        SchemaField::MakeRequired(/*field_id=*/2, "data", string())});
+    unpartitioned_spec_ = PartitionSpec::Unpartitioned();
+
+    ICEBERG_UNWRAP_OR_FAIL(
+        partitioned_spec_,
+        PartitionSpec::Make(
+            /*spec_id=*/1, {PartitionField(/*source_id=*/2, /*field_id=*/1000,
+                                           "data_bucket_16_2", 
Transform::Bucket(16))}));
+
+    MakeTableMetadata();
+  }
+
+  void MakeTableMetadata() {
+    constexpr int64_t kSnapshotId = 1000L;
+    constexpr int64_t kSequenceNumber = 1L;
+    const TimePointMs kTimestampMs =
+        TimePointMsFromUnixMs(1609459200000L);  // 2021-01-01 00:00:00 UTC
+
+    auto snapshot = std::make_shared<Snapshot>(
+        Snapshot{.snapshot_id = kSnapshotId,
+                 .parent_snapshot_id = std::nullopt,
+                 .sequence_number = kSequenceNumber,
+                 .timestamp_ms = kTimestampMs,
+                 .manifest_list = 
"/tmp/metadata/snap-1000-1-manifest-list.avro",
+                 .schema_id = schema_->schema_id()});
+
+    table_metadata_ = std::make_shared<TableMetadata>(
+        TableMetadata{.format_version = 2,
+                      .table_uuid = "test-table-uuid",
+                      .location = "/tmp/table",
+                      .last_sequence_number = kSequenceNumber,
+                      .last_updated_ms = kTimestampMs,
+                      .last_column_id = 2,
+                      .schemas = {schema_},
+                      .current_schema_id = schema_->schema_id(),
+                      .partition_specs = {partitioned_spec_, 
unpartitioned_spec_},
+                      .default_spec_id = partitioned_spec_->spec_id(),
+                      .last_partition_id = 1000,
+                      .current_snapshot_id = kSnapshotId,
+                      .snapshots = {snapshot},
+                      .snapshot_log = {SnapshotLogEntry{.timestamp_ms = 
kTimestampMs,
+                                                        .snapshot_id = 
kSnapshotId}},
+                      .default_sort_order_id = 0,
+                      .refs = {{"main", 
std::make_shared<SnapshotRef>(SnapshotRef{
+                                            .snapshot_id = kSnapshotId,
+                                            .retention = 
SnapshotRef::Branch{}})}}});
+  }
+
+  std::shared_ptr<DataFile> MakePositionDeleteFile(
+      const std::string& path, const PartitionValues& partition, int32_t 
spec_id,
+      std::optional<std::string> referenced_file = std::nullopt) {
+    return std::make_shared<DataFile>(DataFile{
+        .content = DataFile::Content::kPositionDeletes,
+        .file_path = path,
+        .file_format = FileFormatType::kParquet,
+        .partition = partition,
+        .record_count = 1,
+        .file_size_in_bytes = 10,
+        .referenced_data_file = referenced_file,
+        .partition_spec_id = spec_id,
+    });
+  }
+
+  std::shared_ptr<DataFile> MakeEqualityDeleteFile(const std::string& path,
+                                                   const PartitionValues& 
partition,
+                                                   int32_t spec_id,
+                                                   std::vector<int> 
equality_ids = {1}) {
+    return std::make_shared<DataFile>(DataFile{
+        .content = DataFile::Content::kEqualityDeletes,
+        .file_path = path,
+        .file_format = FileFormatType::kParquet,
+        .partition = partition,
+        .record_count = 1,
+        .file_size_in_bytes = 10,
+        .equality_ids = std::move(equality_ids),
+        .partition_spec_id = spec_id,
+    });
+  }
+
+  std::string MakeManifestPath() {
+    static int counter = 0;
+    return std::format("manifest-{}-{}.avro", counter++,
+                       
std::chrono::system_clock::now().time_since_epoch().count());
+  }
+
+  std::shared_ptr<DataFile> MakeDataFile(const std::string& path,
+                                         const PartitionValues& partition,
+                                         int32_t spec_id, int64_t record_count 
= 1,
+                                         std::optional<int32_t> lower_id = 
std::nullopt,
+                                         std::optional<int32_t> upper_id = 
std::nullopt) {
+    auto file = std::make_shared<DataFile>(DataFile{
+        .file_path = path,
+        .file_format = FileFormatType::kParquet,
+        .partition = partition,
+        .record_count = record_count,
+        .file_size_in_bytes = 10,
+        .sort_order_id = 0,
+        .partition_spec_id = spec_id,
+    });
+    // Set lower/upper bounds for field_id=1 ("id" column) if provided
+    if (lower_id.has_value()) {
+      file->lower_bounds[1] = 
Literal::Int(lower_id.value()).Serialize().value();
+    }
+    if (upper_id.has_value()) {
+      file->upper_bounds[1] = 
Literal::Int(upper_id.value()).Serialize().value();
+    }
+    return file;
+  }
+
+  ManifestEntry MakeEntry(ManifestStatus status, int64_t snapshot_id,
+                          int64_t sequence_number, std::shared_ptr<DataFile> 
file) {
+    return ManifestEntry{
+        .status = status,
+        .snapshot_id = snapshot_id,
+        .sequence_number = sequence_number,
+        .file_sequence_number = sequence_number,
+        .data_file = std::move(file),
+    };
+  }
+
+  ManifestFile WriteDataManifest(int format_version, int64_t snapshot_id,
+                                 std::vector<ManifestEntry> entries,
+                                 std::shared_ptr<PartitionSpec> spec) {
+    const std::string manifest_path = MakeManifestPath();
+    auto writer_result = ManifestWriter::MakeWriter(
+        format_version, snapshot_id, manifest_path, file_io_, spec, schema_,
+        ManifestContent::kData,
+        /*first_row_id=*/format_version >= 3 ? std::optional<int64_t>(0L) : 
std::nullopt);
+
+    EXPECT_THAT(writer_result, IsOk());
+    auto writer = std::move(writer_result.value());
+
+    for (const auto& entry : entries) {
+      EXPECT_THAT(writer->WriteEntry(entry), IsOk());
+    }
+
+    EXPECT_THAT(writer->Close(), IsOk());
+    auto manifest_result = writer->ToManifestFile();
+    EXPECT_THAT(manifest_result, IsOk());
+    return std::move(manifest_result.value());
+  }
+
+  ManifestFile WriteDeleteManifest(int format_version, int64_t snapshot_id,
+                                   std::vector<ManifestEntry> entries,
+                                   std::shared_ptr<PartitionSpec> spec) {
+    const std::string manifest_path = MakeManifestPath();
+    auto writer_result =
+        ManifestWriter::MakeWriter(format_version, snapshot_id, manifest_path, 
file_io_,
+                                   spec, schema_, ManifestContent::kDeletes);
+
+    EXPECT_THAT(writer_result, IsOk());
+    auto writer = std::move(writer_result.value());
+
+    for (const auto& entry : entries) {
+      EXPECT_THAT(writer->WriteEntry(entry), IsOk());
+    }
+
+    EXPECT_THAT(writer->Close(), IsOk());
+    auto manifest_result = writer->ToManifestFile();
+    EXPECT_THAT(manifest_result, IsOk());
+    return std::move(manifest_result.value());
+  }
+
+  std::string MakeManifestListPath() {
+    static int counter = 0;
+    return std::format("manifest-list-{}-{}.avro", counter++,
+                       
std::chrono::system_clock::now().time_since_epoch().count());
+  }
+
+  std::string WriteManifestList(int format_version, int64_t snapshot_id,
+                                int64_t sequence_number,
+                                const std::vector<ManifestFile>& manifests) {
+    const std::string manifest_list_path = MakeManifestListPath();
+    constexpr int64_t kParentSnapshotId = 0L;
+
+    auto writer_result = ManifestListWriter::MakeWriter(
+        format_version, snapshot_id, kParentSnapshotId, manifest_list_path, 
file_io_,
+        /*sequence_number=*/format_version >= 2 ? 
std::optional(sequence_number)
+                                                : std::nullopt,
+        /*first_row_id=*/format_version >= 3 ? std::optional<int64_t>(0L) : 
std::nullopt);
+
+    EXPECT_THAT(writer_result, IsOk());
+    auto writer = std::move(writer_result.value());
+    EXPECT_THAT(writer->AddAll(manifests), IsOk());
+    EXPECT_THAT(writer->Close(), IsOk());
+
+    return manifest_list_path;
+  }
+
+  std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> GetSpecsById() {
+    return {{partitioned_spec_->spec_id(), partitioned_spec_},
+            {unpartitioned_spec_->spec_id(), unpartitioned_spec_}};
+  }
+
+  static std::vector<std::string> GetPaths(
+      const std::vector<std::shared_ptr<FileScanTask>>& tasks) {
+    return tasks | std::views::transform([](const auto& task) {
+             return task->data_file()->file_path;
+           }) |
+           std::ranges::to<std::vector<std::string>>();
+  }
+
+  std::shared_ptr<FileIO> file_io_;
+  std::shared_ptr<Schema> schema_;
+  std::shared_ptr<PartitionSpec> partitioned_spec_;
+  std::shared_ptr<PartitionSpec> unpartitioned_spec_;
+  std::shared_ptr<TableMetadata> table_metadata_;
+};
+
+TEST_P(TableScanTest, TableScanBuilderOptions) {
+  // Test basic scan creation and default values
+  ICEBERG_UNWRAP_OR_FAIL(auto builder, TableScanBuilder::Make(table_metadata_, 
file_io_));
+  ICEBERG_UNWRAP_OR_FAIL(auto basic_scan, builder->Build());
+  EXPECT_NE(basic_scan, nullptr);
+  EXPECT_EQ(basic_scan->metadata(), table_metadata_);
+  EXPECT_EQ(basic_scan->io(), file_io_);
+  EXPECT_TRUE(basic_scan->is_case_sensitive());
+
+  // Test all builder options with method chaining
+  auto projected_schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32())});
+  auto filter = Expressions::Equal("id", Literal::Int(42));
+  constexpr int64_t kMinRows = 1000;
+  constexpr int64_t kSnapshotId = 1000L;
+  const std::string branch_name = "test-branch";
+
+  ICEBERG_UNWRAP_OR_FAIL(auto builder2,
+                         TableScanBuilder::Make(table_metadata_, file_io_));
+  ICEBERG_UNWRAP_OR_FAIL(auto scan, builder2->Option("key1", "value1")
+                                        .Option("key2", "value2")
+                                        .CaseSensitive(false)
+                                        .Project(projected_schema)
+                                        .Filter(filter)
+                                        .IncludeColumnStats({"id", "data"})
+                                        .IgnoreResiduals()
+                                        .MinRowsRequested(kMinRows)
+                                        .UseSnapshot(kSnapshotId)
+                                        .UseBranch(branch_name)
+                                        .Build());
+
+  // Verify all options were set correctly
+  ICEBERG_UNWRAP_OR_FAIL(auto schema, scan->schema());
+  EXPECT_EQ(schema, projected_schema);
+  EXPECT_EQ(scan->filter(), filter);
+  EXPECT_FALSE(scan->is_case_sensitive());
+
+  const auto& context = scan->context();
+  EXPECT_EQ(context.options.at("key1"), "value1");
+  EXPECT_EQ(context.options.at("key2"), "value2");
+  EXPECT_TRUE(context.return_column_stats);
+  EXPECT_EQ(context.columns_to_keep_stats.size(), 2);
+  EXPECT_TRUE(context.columns_to_keep_stats.contains(1));  // id field
+  EXPECT_TRUE(context.columns_to_keep_stats.contains(2));  // data field
+  EXPECT_TRUE(context.ignore_residuals);
+  EXPECT_TRUE(context.min_rows_requested.has_value());
+  EXPECT_EQ(context.min_rows_requested.value(), kMinRows);
+  EXPECT_TRUE(context.snapshot_id.has_value());
+  EXPECT_EQ(context.snapshot_id.value(), kSnapshotId);
+  EXPECT_EQ(context.branch, branch_name);
+
+  // Test UseRef separately
+  ICEBERG_UNWRAP_OR_FAIL(auto builder3,
+                         TableScanBuilder::Make(table_metadata_, file_io_));
+  builder3->UseRef("main");
+  ICEBERG_UNWRAP_OR_FAIL(auto ref_scan, builder3->Build());
+  ICEBERG_UNWRAP_OR_FAIL(auto snapshot, ref_scan->snapshot());
+  EXPECT_EQ(snapshot->snapshot_id, 1000L);
+}
+
+TEST_P(TableScanTest, TableScanBuilderValidationErrors) {
+  // Test negative min rows
+  ICEBERG_UNWRAP_OR_FAIL(auto builder, TableScanBuilder::Make(table_metadata_, 
file_io_));
+  builder->MinRowsRequested(-1);
+  EXPECT_THAT(builder->Build(), IsError(ErrorKind::kValidationFailed));
+
+  // Test invalid snapshot ID
+  ICEBERG_UNWRAP_OR_FAIL(auto builder2,
+                         TableScanBuilder::Make(table_metadata_, file_io_));
+  builder2->UseSnapshot(9999L);
+  EXPECT_THAT(builder2->Build(), IsError(ErrorKind::kValidationFailed));
+
+  // Test invalid ref
+  ICEBERG_UNWRAP_OR_FAIL(auto builder3,
+                         TableScanBuilder::Make(table_metadata_, file_io_));
+  builder3->UseRef("non-existent-ref");
+  EXPECT_THAT(builder3->Build(), IsError(ErrorKind::kValidationFailed));
+
+  // Test null inputs
+  EXPECT_THAT(TableScanBuilder::Make(nullptr, file_io_),
+              IsError(ErrorKind::kInvalidArgument));
+  EXPECT_THAT(TableScanBuilder::Make(table_metadata_, nullptr),
+              IsError(ErrorKind::kInvalidArgument));
+}
+
+TEST_P(TableScanTest, DataTableScanPlanFilesEmpty) {
+  auto empty_metadata = std::make_shared<TableMetadata>(
+      TableMetadata{.format_version = 2,
+                    .schemas = {schema_},
+                    .current_schema_id = schema_->schema_id(),
+                    .partition_specs = {unpartitioned_spec_},
+                    .default_spec_id = unpartitioned_spec_->spec_id(),
+                    .current_snapshot_id = -1,
+                    .snapshots = {},
+                    .refs = {}});
+
+  ICEBERG_UNWRAP_OR_FAIL(auto builder, TableScanBuilder::Make(empty_metadata, 
file_io_));
+  ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+  ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+  EXPECT_TRUE(tasks.empty());
+}
+
+TEST_P(TableScanTest, PlanFilesWithDataManifests) {
+  int version = GetParam();
+
+  constexpr int64_t kSnapshotId = 1000L;
+  const auto part_value = PartitionValues({Literal::Int(0)});
+
+  std::vector<ManifestEntry> data_entries{
+      MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+                MakeDataFile("/path/to/data1.parquet", part_value,
+                             partitioned_spec_->spec_id(), 
/*record_count=*/100)),
+      MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+                MakeDataFile("/path/to/data2.parquet", part_value,
+                             partitioned_spec_->spec_id(), 
/*record_count=*/200))};
+  auto data_manifest =
+      WriteDataManifest(version, kSnapshotId, std::move(data_entries), 
partitioned_spec_);
+  std::string manifest_list_path =
+      WriteManifestList(version, kSnapshotId, /*sequence_number=*/1, 
{data_manifest});
+
+  // Create a snapshot that references this manifest list
+  auto timestamp_ms = TimePointMsFromUnixMs(1609459200000L);
+  auto snapshot_with_manifest =
+      std::make_shared<Snapshot>(Snapshot{.snapshot_id = kSnapshotId,
+                                          .parent_snapshot_id = std::nullopt,
+                                          .sequence_number = 1L,
+                                          .timestamp_ms = timestamp_ms,
+                                          .manifest_list = manifest_list_path,
+                                          .summary = {},
+                                          .schema_id = schema_->schema_id()});
+
+  auto metadata_with_manifest = std::make_shared<TableMetadata>(
+      TableMetadata{.format_version = 2,
+                    .table_uuid = "test-table-uuid",
+                    .location = "/tmp/table",
+                    .last_sequence_number = 1L,
+                    .last_updated_ms = timestamp_ms,
+                    .last_column_id = 2,
+                    .schemas = {schema_},
+                    .current_schema_id = schema_->schema_id(),
+                    .partition_specs = {partitioned_spec_, 
unpartitioned_spec_},
+                    .default_spec_id = partitioned_spec_->spec_id(),
+                    .last_partition_id = 1000,
+                    .current_snapshot_id = kSnapshotId,
+                    .snapshots = {snapshot_with_manifest},
+                    .snapshot_log = {SnapshotLogEntry{.timestamp_ms = 
timestamp_ms,
+                                                      .snapshot_id = 
kSnapshotId}},
+                    .default_sort_order_id = 0,
+                    .refs = {{"main", 
std::make_shared<SnapshotRef>(SnapshotRef{
+                                          .snapshot_id = kSnapshotId,
+                                          .retention = SnapshotRef::Branch{},
+                                      })}}});
+
+  ICEBERG_UNWRAP_OR_FAIL(auto builder,
+                         TableScanBuilder::Make(metadata_with_manifest, 
file_io_));
+  ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+  ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+  ASSERT_EQ(tasks.size(), 2);
+  EXPECT_THAT(GetPaths(tasks), 
testing::UnorderedElementsAre("/path/to/data1.parquet",
+                                                             
"/path/to/data2.parquet"));
+}
+
+TEST_P(TableScanTest, PlanFilesWithMultipleManifests) {
+  int version = GetParam();
+
+  const auto partition_a = PartitionValues({Literal::Int(0)});
+  const auto partition_b = PartitionValues({Literal::Int(1)});
+
+  // Create first data manifest
+  std::vector<ManifestEntry> data_entries_1{MakeEntry(
+      ManifestStatus::kAdded, /*snapshot_id=*/1000L, /*sequence_number=*/1,
+      MakeDataFile("/path/to/data1.parquet", partition_a, 
partitioned_spec_->spec_id()))};
+  auto data_manifest_1 = WriteDataManifest(version, /*snapshot_id=*/1000L,
+                                           std::move(data_entries_1), 
partitioned_spec_);
+
+  // Create second data manifest
+  std::vector<ManifestEntry> data_entries_2{MakeEntry(
+      ManifestStatus::kAdded, /*snapshot_id=*/1000L, /*sequence_number=*/1,
+      MakeDataFile("/path/to/data2.parquet", partition_b, 
partitioned_spec_->spec_id()))};
+  auto data_manifest_2 = WriteDataManifest(version, /*snapshot_id=*/1000L,
+                                           std::move(data_entries_2), 
partitioned_spec_);
+
+  // Write manifest list with multiple manifests
+  std::string manifest_list_path =
+      WriteManifestList(version, /*snapshot_id=*/1000L, /*sequence_number=*/1,
+                        {data_manifest_1, data_manifest_2});
+
+  // Create a snapshot that references this manifest list
+  auto timestamp_ms = TimePointMsFromUnixMs(1609459200000L);
+  auto snapshot_with_manifests =
+      std::make_shared<Snapshot>(Snapshot{.snapshot_id = 1000L,
+                                          .parent_snapshot_id = std::nullopt,
+                                          .sequence_number = 1L,
+                                          .timestamp_ms = timestamp_ms,
+                                          .manifest_list = manifest_list_path,
+                                          .summary = {},
+                                          .schema_id = schema_->schema_id()});
+
+  auto metadata_with_manifests = std::make_shared<TableMetadata>(
+      TableMetadata{.format_version = 2,
+                    .table_uuid = "test-table-uuid",
+                    .location = "/tmp/table",
+                    .last_sequence_number = 1L,
+                    .last_updated_ms = timestamp_ms,
+                    .last_column_id = 2,
+                    .schemas = {schema_},
+                    .current_schema_id = schema_->schema_id(),
+                    .partition_specs = {partitioned_spec_, 
unpartitioned_spec_},
+                    .default_spec_id = partitioned_spec_->spec_id(),
+                    .last_partition_id = 1000,
+                    .current_snapshot_id = 1000L,
+                    .snapshots = {snapshot_with_manifests},
+                    .snapshot_log = {SnapshotLogEntry{.timestamp_ms = 
timestamp_ms,
+                                                      .snapshot_id = 1000L}},
+                    .default_sort_order_id = 0,
+                    .refs = {{"main", 
std::make_shared<SnapshotRef>(SnapshotRef{
+                                          .snapshot_id = 1000L,
+                                          .retention = SnapshotRef::Branch{},
+                                      })}}});
+
+  ICEBERG_UNWRAP_OR_FAIL(auto builder,
+                         TableScanBuilder::Make(metadata_with_manifests, 
file_io_));
+  ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+  ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+  ASSERT_EQ(tasks.size(), 2);
+  EXPECT_THAT(GetPaths(tasks), 
testing::UnorderedElementsAre("/path/to/data1.parquet",
+                                                             
"/path/to/data2.parquet"));
+}
+
+TEST_P(TableScanTest, PlanFilesWithFilter) {
+  int version = GetParam();
+
+  constexpr int64_t kSnapshotId = 1000L;
+  const auto part_value = PartitionValues({Literal::Int(0)});
+
+  // Create two data files with non-overlapping id ranges:
+  // - data1.parquet: id range [1, 50]
+  // - data2.parquet: id range [51, 100]
+  std::vector<ManifestEntry> data_entries{
+      MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+                MakeDataFile("/path/to/data1.parquet", part_value,
+                             partitioned_spec_->spec_id(), /*record_count=*/1,
+                             /*lower_id=*/1, /*upper_id=*/50)),
+      MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+                MakeDataFile("/path/to/data2.parquet", part_value,
+                             partitioned_spec_->spec_id(), /*record_count=*/1,
+                             /*lower_id=*/51, /*upper_id=*/100))};
+  auto data_manifest =
+      WriteDataManifest(version, kSnapshotId, std::move(data_entries), 
partitioned_spec_);
+  std::string manifest_list_path =
+      WriteManifestList(version, kSnapshotId, /*sequence_number=*/1, 
{data_manifest});
+
+  auto timestamp_ms = TimePointMsFromUnixMs(1609459200000L);
+  auto snapshot = std::make_shared<Snapshot>(Snapshot{.snapshot_id = 
kSnapshotId,
+                                                      .parent_snapshot_id = 
std::nullopt,
+                                                      .sequence_number = 1L,
+                                                      .timestamp_ms = 
timestamp_ms,
+                                                      .manifest_list = 
manifest_list_path,
+                                                      .schema_id = 
schema_->schema_id()});
+
+  auto metadata = std::make_shared<TableMetadata>(TableMetadata{
+      .format_version = 2,
+      .table_uuid = "test-table-uuid",
+      .location = "/tmp/table",
+      .last_sequence_number = 1L,
+      .last_updated_ms = timestamp_ms,
+      .last_column_id = 2,
+      .schemas = {schema_},
+      .current_schema_id = schema_->schema_id(),
+      .partition_specs = {partitioned_spec_, unpartitioned_spec_},
+      .default_spec_id = partitioned_spec_->spec_id(),
+      .last_partition_id = 1000,
+      .current_snapshot_id = kSnapshotId,
+      .snapshots = {snapshot},
+      .snapshot_log = {SnapshotLogEntry{.timestamp_ms = timestamp_ms,
+                                        .snapshot_id = kSnapshotId}},
+      .default_sort_order_id = 0,
+      .refs = {{"main",
+                std::make_shared<SnapshotRef>(SnapshotRef{
+                    .snapshot_id = kSnapshotId, .retention = 
SnapshotRef::Branch{}})}}});
+
+  // Test 1: Filter matches only data1.parquet (id=25 is in range [1, 50])
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto builder, TableScanBuilder::Make(metadata, 
file_io_));
+    builder->Filter(Expressions::Equal("id", Literal::Int(25)));
+    ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+    ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+    ASSERT_EQ(tasks.size(), 1);
+    EXPECT_EQ(tasks[0]->data_file()->file_path, "/path/to/data1.parquet");
+  }
+
+  // Test 2: Filter matches only data2.parquet (id=75 is in range [51, 100])
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto builder, TableScanBuilder::Make(metadata, 
file_io_));
+    builder->Filter(Expressions::Equal("id", Literal::Int(75)));
+    ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+    ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+    ASSERT_EQ(tasks.size(), 1);
+    EXPECT_EQ(tasks[0]->data_file()->file_path, "/path/to/data2.parquet");
+  }
+
+  // Test 3: Filter matches both files (id > 0 covers both ranges)
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto builder, TableScanBuilder::Make(metadata, 
file_io_));
+    builder->Filter(Expressions::GreaterThan("id", Literal::Int(0)));
+    ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+    ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+    ASSERT_EQ(tasks.size(), 2);
+    EXPECT_THAT(GetPaths(tasks), 
testing::UnorderedElementsAre("/path/to/data1.parquet",
+                                                               
"/path/to/data2.parquet"));
+  }
+
+  // Test 4: Filter matches no files (id=200 is outside both ranges)
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto builder, TableScanBuilder::Make(metadata, 
file_io_));
+    builder->Filter(Expressions::Equal("id", Literal::Int(200)));
+    ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+    ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+    EXPECT_TRUE(tasks.empty());
+  }
+}
+
+TEST_P(TableScanTest, PlanFilesWithDeleteFiles) {
+  int version = GetParam();
+  if (version < 2) {
+    GTEST_SKIP() << "Delete files only supported in V2+";
+  }
+
+  constexpr int64_t kSnapshotId = 1000L;
+  const auto part_value = PartitionValues({Literal::Int(0)});
+
+  // Create data manifest with files
+  std::vector<ManifestEntry> data_entries{
+      MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+                MakeDataFile("/path/to/data1.parquet", part_value,
+                             partitioned_spec_->spec_id(), 
/*record_count=*/100)),
+      MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+                MakeDataFile("/path/to/data2.parquet", part_value,
+                             partitioned_spec_->spec_id(), 
/*record_count=*/200))};
+  auto data_manifest =
+      WriteDataManifest(version, kSnapshotId, std::move(data_entries), 
partitioned_spec_);
+
+  // Create delete manifest with position delete files
+  std::vector<ManifestEntry> delete_entries{
+      MakeEntry(
+          ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/2,
+          MakePositionDeleteFile("/path/to/pos_delete.parquet", part_value,
+                                 partitioned_spec_->spec_id(), 
"/path/to/data1.parquet")),
+      MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/2,
+                MakeEqualityDeleteFile("/path/to/eq_delete.parquet", 
part_value,
+                                       partitioned_spec_->spec_id(), {1}))};
+  auto delete_manifest = WriteDeleteManifest(
+      version, kSnapshotId, std::move(delete_entries), partitioned_spec_);
+  std::string manifest_list_path = WriteManifestList(
+      version, kSnapshotId, /*sequence_number=*/2, {data_manifest, 
delete_manifest});
+
+  // Create a snapshot that references this manifest list
+  auto timestamp_ms = TimePointMsFromUnixMs(1609459200000L);
+  auto snapshot_with_manifests =
+      std::make_shared<Snapshot>(Snapshot{.snapshot_id = kSnapshotId,
+                                          .parent_snapshot_id = std::nullopt,
+                                          .sequence_number = 2L,
+                                          .timestamp_ms = timestamp_ms,
+                                          .manifest_list = manifest_list_path,
+                                          .summary = {},
+                                          .schema_id = schema_->schema_id()});
+
+  auto metadata_with_manifests = std::make_shared<TableMetadata>(
+      TableMetadata{.format_version = 2,
+                    .table_uuid = "test-table-uuid",
+                    .location = "/tmp/table",
+                    .last_sequence_number = 2L,
+                    .last_updated_ms = timestamp_ms,
+                    .last_column_id = 2,
+                    .schemas = {schema_},
+                    .current_schema_id = schema_->schema_id(),
+                    .partition_specs = {partitioned_spec_, 
unpartitioned_spec_},
+                    .default_spec_id = partitioned_spec_->spec_id(),
+                    .last_partition_id = 1000,
+                    .current_snapshot_id = kSnapshotId,
+                    .snapshots = {snapshot_with_manifests},
+                    .snapshot_log = {SnapshotLogEntry{.timestamp_ms = 
timestamp_ms,
+                                                      .snapshot_id = 
kSnapshotId}},
+                    .default_sort_order_id = 0,
+                    .refs = {{"main", 
std::make_shared<SnapshotRef>(SnapshotRef{
+                                          .snapshot_id = kSnapshotId,
+                                          .retention = SnapshotRef::Branch{},
+                                      })}}});
+
+  ICEBERG_UNWRAP_OR_FAIL(auto builder,
+                         TableScanBuilder::Make(metadata_with_manifests, 
file_io_));
+  ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+  ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+  ASSERT_EQ(tasks.size(), 2);
+  EXPECT_THAT(GetPaths(tasks), 
testing::UnorderedElementsAre("/path/to/data1.parquet",
+                                                             
"/path/to/data2.parquet"));
+  // Verify that delete files are associated with the tasks
+  for (const auto& task : tasks) {
+    EXPECT_GT(task->delete_files().size(), 0);
+  }
+}
+
+INSTANTIATE_TEST_SUITE_P(TableScanVersions, TableScanTest, testing::Values(1, 
2, 3));
+
+}  // namespace iceberg

Reply via email to