This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new dba8f922 feat: add snapshot util (#420)
dba8f922 is described below

commit dba8f9223d3f71356c80334d1cdf00a5f6dcce03
Author: Junwang Zhao <[email protected]>
AuthorDate: Mon Dec 22 17:14:06 2025 +0800

    feat: add snapshot util (#420)
---
 src/iceberg/CMakeLists.txt                |   1 +
 src/iceberg/avro/avro_stream_internal.cc  |  17 +-
 src/iceberg/exception.h                   |   2 +-
 src/iceberg/meson.build                   |   1 +
 src/iceberg/table_metadata.cc             |  10 +-
 src/iceberg/table_metadata.h              |   2 +-
 src/iceberg/test/CMakeLists.txt           |   1 +
 src/iceberg/test/meson.build              |   1 +
 src/iceberg/test/snapshot_util_test.cc    | 374 ++++++++++++++++++++++++++++++
 src/iceberg/type.cc                       |  24 +-
 src/iceberg/util/decimal.cc               |   4 +-
 src/iceberg/util/macros.h                 |  17 ++
 src/iceberg/util/snapshot_util.cc         | 323 ++++++++++++++++++++++++++
 src/iceberg/util/snapshot_util_internal.h | 276 ++++++++++++++++++++++
 src/iceberg/util/timepoint.cc             |  20 ++
 src/iceberg/util/timepoint.h              |   3 +
 src/iceberg/util/uuid.cc                  |   2 +-
 17 files changed, 1049 insertions(+), 29 deletions(-)

diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index 9c25015c..0579c67d 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -83,6 +83,7 @@ set(ICEBERG_SOURCES
     util/decimal.cc
     util/gzip_internal.cc
     util/murmurhash3_internal.cc
+    util/snapshot_util.cc
     util/temporal_util.cc
     util/timepoint.cc
     util/truncate_util.cc
diff --git a/src/iceberg/avro/avro_stream_internal.cc 
b/src/iceberg/avro/avro_stream_internal.cc
index f299b523..e868bab4 100644
--- a/src/iceberg/avro/avro_stream_internal.cc
+++ b/src/iceberg/avro/avro_stream_internal.cc
@@ -66,8 +66,9 @@ bool AvroInputStream::next(const uint8_t** data, size_t* len) 
{
 }
 
 void AvroInputStream::backup(size_t len) {
-  ICEBERG_CHECK(len <= buffer_pos_, "Cannot backup {} bytes, only {} bytes 
available",
-                len, buffer_pos_);
+  ICEBERG_CHECK_OR_DIE(len <= buffer_pos_,
+                       "Cannot backup {} bytes, only {} bytes available", len,
+                       buffer_pos_);
 
   buffer_pos_ -= len;
   byte_count_ -= len;
@@ -88,7 +89,8 @@ size_t AvroInputStream::byteCount() const { return 
byte_count_; }
 
 void AvroInputStream::seek(int64_t position) {
   auto status = input_stream_->Seek(position);
-  ICEBERG_CHECK(status.ok(), "Failed to seek to {}, got {}", position, 
status.ToString());
+  ICEBERG_CHECK_OR_DIE(status.ok(), "Failed to seek to {}, got {}", position,
+                       status.ToString());
 
   buffer_pos_ = 0;
   available_bytes_ = 0;
@@ -116,8 +118,9 @@ bool AvroOutputStream::next(uint8_t** data, size_t* len) {
 }
 
 void AvroOutputStream::backup(size_t len) {
-  ICEBERG_CHECK(len <= buffer_pos_, "Cannot backup {} bytes, only {} bytes 
available",
-                len, buffer_pos_);
+  ICEBERG_CHECK_OR_DIE(len <= buffer_pos_,
+                       "Cannot backup {} bytes, only {} bytes available", len,
+                       buffer_pos_);
   buffer_pos_ -= len;
 }
 
@@ -126,12 +129,12 @@ uint64_t AvroOutputStream::byteCount() const { return 
flushed_bytes_ + buffer_po
 void AvroOutputStream::flush() {
   if (buffer_pos_ > 0) {
     auto status = output_stream_->Write(buffer_.data(), buffer_pos_);
-    ICEBERG_CHECK(status.ok(), "Write failed {}", status.ToString());
+    ICEBERG_CHECK_OR_DIE(status.ok(), "Write failed {}", status.ToString());
     flushed_bytes_ += buffer_pos_;
     buffer_pos_ = 0;
   }
   auto status = output_stream_->Flush();
-  ICEBERG_CHECK(status.ok(), "Flush failed {}", status.ToString());
+  ICEBERG_CHECK_OR_DIE(status.ok(), "Flush failed {}", status.ToString());
 }
 
 const std::shared_ptr<::arrow::io::OutputStream>& 
AvroOutputStream::arrow_output_stream()
diff --git a/src/iceberg/exception.h b/src/iceberg/exception.h
index bbb9c228..0333eb12 100644
--- a/src/iceberg/exception.h
+++ b/src/iceberg/exception.h
@@ -44,7 +44,7 @@ class ICEBERG_EXPORT ExpressionError : public IcebergError {
   explicit ExpressionError(const std::string& what) : IcebergError(what) {}
 };
 
-#define ICEBERG_CHECK(condition, ...)                        \
+#define ICEBERG_CHECK_OR_DIE(condition, ...)                 \
   do {                                                       \
     if (!(condition)) [[unlikely]] {                         \
       throw iceberg::IcebergError(std::format(__VA_ARGS__)); \
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index 21a41dcf..850f6590 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -105,6 +105,7 @@ iceberg_sources = files(
     'util/decimal.cc',
     'util/gzip_internal.cc',
     'util/murmurhash3_internal.cc',
+    'util/snapshot_util.cc',
     'util/temporal_util.cc',
     'util/timepoint.cc',
     'util/truncate_util.cc',
diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc
index 9bf6c530..2136494f 100644
--- a/src/iceberg/table_metadata.cc
+++ b/src/iceberg/table_metadata.cc
@@ -70,9 +70,9 @@ Result<std::shared_ptr<Schema>> TableMetadata::Schema() const 
{
 }
 
 Result<std::shared_ptr<Schema>> TableMetadata::SchemaById(
-    const std::optional<int32_t>& schema_id) const {
+    std::optional<int32_t> schema_id) const {
   auto iter = std::ranges::find_if(schemas, [schema_id](const auto& schema) {
-    return schema->schema_id() == schema_id;
+    return schema != nullptr && schema->schema_id() == schema_id;
   });
   if (iter == schemas.end()) {
     return NotFound("Schema with ID {} is not found", schema_id.value_or(-1));
@@ -82,7 +82,7 @@ Result<std::shared_ptr<Schema>> TableMetadata::SchemaById(
 
 Result<std::shared_ptr<PartitionSpec>> TableMetadata::PartitionSpec() const {
   auto iter = std::ranges::find_if(partition_specs, [this](const auto& spec) {
-    return spec->spec_id() == default_spec_id;
+    return spec != nullptr && spec->spec_id() == default_spec_id;
   });
   if (iter == partition_specs.end()) {
     return NotFound("Default partition spec is not found");
@@ -92,7 +92,7 @@ Result<std::shared_ptr<PartitionSpec>> 
TableMetadata::PartitionSpec() const {
 
 Result<std::shared_ptr<SortOrder>> TableMetadata::SortOrder() const {
   auto iter = std::ranges::find_if(sort_orders, [this](const auto& order) {
-    return order->order_id() == default_sort_order_id;
+    return order != nullptr && order->order_id() == default_sort_order_id;
   });
   if (iter == sort_orders.end()) {
     return NotFound("Default sort order is not found");
@@ -106,7 +106,7 @@ Result<std::shared_ptr<Snapshot>> TableMetadata::Snapshot() 
const {
 
 Result<std::shared_ptr<Snapshot>> TableMetadata::SnapshotById(int64_t 
snapshot_id) const {
   auto iter = std::ranges::find_if(snapshots, [snapshot_id](const auto& 
snapshot) {
-    return snapshot->snapshot_id == snapshot_id;
+    return snapshot != nullptr && snapshot->snapshot_id == snapshot_id;
   });
   if (iter == snapshots.end()) {
     return NotFound("Snapshot with ID {} is not found", snapshot_id);
diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h
index f7c26011..3f2f3610 100644
--- a/src/iceberg/table_metadata.h
+++ b/src/iceberg/table_metadata.h
@@ -128,7 +128,7 @@ struct ICEBERG_EXPORT TableMetadata {
   Result<std::shared_ptr<iceberg::Schema>> Schema() const;
   /// \brief Get the current schema by ID, return NotFoundError if not found
   Result<std::shared_ptr<iceberg::Schema>> SchemaById(
-      const std::optional<int32_t>& schema_id) const;
+      std::optional<int32_t> schema_id) const;
   /// \brief Get the current partition spec, return NotFoundError if not found
   Result<std::shared_ptr<iceberg::PartitionSpec>> PartitionSpec() const;
   /// \brief Get the current sort order, return NotFoundError if not found
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index 2af7d1c4..28178b88 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -70,6 +70,7 @@ add_iceberg_test(table_test
                  SOURCES
                  metrics_config_test.cc
                  snapshot_test.cc
+                 snapshot_util_test.cc
                  table_metadata_builder_test.cc
                  table_requirement_test.cc
                  table_requirements_test.cc
diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build
index 5ccab940..fcd397b9 100644
--- a/src/iceberg/test/meson.build
+++ b/src/iceberg/test/meson.build
@@ -47,6 +47,7 @@ iceberg_tests = {
         'sources': files(
             'metrics_config_test.cc',
             'snapshot_test.cc',
+            'snapshot_util_test.cc',
             'table_metadata_builder_test.cc',
             'table_requirement_test.cc',
             'table_requirements_test.cc',
diff --git a/src/iceberg/test/snapshot_util_test.cc 
b/src/iceberg/test/snapshot_util_test.cc
new file mode 100644
index 00000000..e4e17251
--- /dev/null
+++ b/src/iceberg/test/snapshot_util_test.cc
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <chrono>
+#include <memory>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "iceberg/partition_spec.h"
+#include "iceberg/schema.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/sort_order.h"
+#include "iceberg/table.h"
+#include "iceberg/table_identifier.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/test/mock_catalog.h"
+#include "iceberg/test/mock_io.h"
+#include "iceberg/util/snapshot_util_internal.h"
+#include "iceberg/util/timepoint.h"
+
+namespace iceberg {
+
+namespace {
+
+// Schema for testing: id (int32), data (string)
+std::shared_ptr<Schema> CreateTestSchema() {
+  auto field1 = SchemaField::MakeRequired(1, "id", int32());
+  auto field2 = SchemaField::MakeRequired(2, "data", string());
+  return std::make_shared<Schema>(std::vector<SchemaField>{field1, field2}, 0);
+}
+
+// Helper to create a snapshot
+std::shared_ptr<Snapshot> CreateSnapshot(int64_t snapshot_id,
+                                         std::optional<int64_t> parent_id,
+                                         int64_t sequence_number,
+                                         TimePointMs timestamp_ms) {
+  return std::make_shared<Snapshot>(
+      Snapshot{.snapshot_id = snapshot_id,
+               .parent_snapshot_id = parent_id,
+               .sequence_number = sequence_number,
+               .timestamp_ms = timestamp_ms,
+               .manifest_list =
+                   "s3://bucket/manifest-list-" + std::to_string(snapshot_id) 
+ ".avro",
+               .summary = {},
+               .schema_id = 0});
+}
+
+// Helper to create table metadata with snapshots
+std::shared_ptr<TableMetadata> CreateTableMetadataWithSnapshots(
+    int64_t base_snapshot_id, int64_t main1_snapshot_id, int64_t 
main2_snapshot_id,
+    int64_t branch_snapshot_id, int64_t fork0_snapshot_id, int64_t 
fork1_snapshot_id,
+    int64_t fork2_snapshot_id, TimePointMs base_timestamp) {
+  auto metadata = std::make_shared<TableMetadata>();
+  metadata->format_version = 2;
+  metadata->table_uuid = "test-uuid-1234";
+  metadata->location = "s3://bucket/test";
+  metadata->last_sequence_number = 10;
+  metadata->last_updated_ms = base_timestamp + std::chrono::milliseconds(3000);
+  metadata->last_column_id = 2;
+  metadata->current_schema_id = 0;
+  metadata->schemas.push_back(CreateTestSchema());
+  metadata->default_spec_id = PartitionSpec::kInitialSpecId;
+  metadata->last_partition_id = 0;
+  metadata->current_snapshot_id = main2_snapshot_id;
+  metadata->default_sort_order_id = SortOrder::kInitialSortOrderId;
+  metadata->sort_orders.push_back(SortOrder::Unsorted());
+  metadata->next_row_id = TableMetadata::kInitialRowId;
+  metadata->properties = TableProperties::default_properties();
+
+  // Create snapshots: base -> main1 -> main2
+  auto base_snapshot = CreateSnapshot(base_snapshot_id, std::nullopt, 1, 
base_timestamp);
+  auto main1_snapshot = CreateSnapshot(main1_snapshot_id, base_snapshot_id, 2,
+                                       base_timestamp + 
std::chrono::milliseconds(1000));
+  auto main2_snapshot = CreateSnapshot(main2_snapshot_id, main1_snapshot_id, 3,
+                                       base_timestamp + 
std::chrono::milliseconds(2000));
+
+  // Branch snapshot (from base)
+  auto branch_snapshot = CreateSnapshot(branch_snapshot_id, base_snapshot_id, 
4,
+                                        base_timestamp + 
std::chrono::milliseconds(1500));
+
+  // Fork branch snapshots: fork0 -> fork1 -> fork2 (fork0 will be expired)
+  auto fork0_snapshot = CreateSnapshot(fork0_snapshot_id, base_snapshot_id, 5,
+                                       base_timestamp + 
std::chrono::milliseconds(500));
+  auto fork1_snapshot = CreateSnapshot(fork1_snapshot_id, fork0_snapshot_id, 6,
+                                       base_timestamp + 
std::chrono::milliseconds(2500));
+  auto fork2_snapshot = CreateSnapshot(fork2_snapshot_id, fork1_snapshot_id, 7,
+                                       base_timestamp + 
std::chrono::milliseconds(3000));
+
+  metadata->snapshots = {base_snapshot,   main1_snapshot, main2_snapshot,
+                         branch_snapshot, fork1_snapshot, fork2_snapshot};
+  // Note: fork0 is expired, so it's not in the snapshots list
+
+  // Snapshot log
+  metadata->snapshot_log = {
+      {.timestamp_ms = base_timestamp, .snapshot_id = base_snapshot_id},
+      {.timestamp_ms = base_timestamp + std::chrono::milliseconds(1000),
+       .snapshot_id = main1_snapshot_id},
+      {.timestamp_ms = base_timestamp + std::chrono::milliseconds(2000),
+       .snapshot_id = main2_snapshot_id},
+  };
+
+  // Create refs
+  std::string branch_name = "b1";
+  metadata->refs[branch_name] = std::make_shared<SnapshotRef>(
+      SnapshotRef{.snapshot_id = branch_snapshot_id, .retention = 
SnapshotRef::Branch{}});
+
+  std::string fork_branch = "fork";
+  metadata->refs[fork_branch] = std::make_shared<SnapshotRef>(
+      SnapshotRef{.snapshot_id = fork2_snapshot_id, .retention = 
SnapshotRef::Branch{}});
+
+  return metadata;
+}
+
+// Helper to extract snapshot IDs from a vector of snapshots
+std::vector<int64_t> ExtractSnapshotIds(
+    const std::vector<std::shared_ptr<Snapshot>>& snapshots) {
+  std::vector<int64_t> ids;
+  ids.reserve(snapshots.size());
+  for (const auto& snapshot : snapshots) {
+    ids.push_back(snapshot->snapshot_id);
+  }
+  return ids;
+}
+
+}  // namespace
+
+class SnapshotUtilTest : public ::testing::Test {
+ protected:
+  void SetUp() override {
+    base_timestamp_ = TimePointMs{std::chrono::milliseconds(1000000000000)};
+    base_snapshot_id_ = 100;
+    main1_snapshot_id_ = 101;
+    main2_snapshot_id_ = 102;
+    branch_snapshot_id_ = 200;
+    fork0_snapshot_id_ = 300;
+    fork1_snapshot_id_ = 301;
+    fork2_snapshot_id_ = 302;
+
+    auto metadata = CreateTableMetadataWithSnapshots(
+        base_snapshot_id_, main1_snapshot_id_, main2_snapshot_id_, 
branch_snapshot_id_,
+        fork0_snapshot_id_, fork1_snapshot_id_, fork2_snapshot_id_, 
base_timestamp_);
+
+    TableIdentifier table_ident{.ns = {}, .name = "test"};
+    auto io = std::make_shared<MockFileIO>();
+    auto catalog = std::make_shared<MockCatalog>();
+    table_ = std::move(Table::Make(table_ident, std::move(metadata),
+                                   "s3://bucket/test/metadata.json", io, 
catalog)
+                           .value());
+  }
+
+  TimePointMs base_timestamp_;
+  int64_t base_snapshot_id_;
+  int64_t main1_snapshot_id_;
+  int64_t main2_snapshot_id_;
+  int64_t branch_snapshot_id_;
+  int64_t fork0_snapshot_id_;
+  int64_t fork1_snapshot_id_;
+  int64_t fork2_snapshot_id_;
+  std::shared_ptr<Table> table_;
+};
+
+TEST_F(SnapshotUtilTest, IsParentAncestorOf) {
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto result1,
+      SnapshotUtil::IsParentAncestorOf(*table_, main1_snapshot_id_, 
base_snapshot_id_));
+  EXPECT_TRUE(result1);
+
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto result2,
+      SnapshotUtil::IsParentAncestorOf(*table_, branch_snapshot_id_, 
main1_snapshot_id_));
+  EXPECT_FALSE(result2);
+
+  // fork2's parent is fork1, fork1's parent is fork0 (expired)
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto result3,
+      SnapshotUtil::IsParentAncestorOf(*table_, fork2_snapshot_id_, 
fork0_snapshot_id_));
+  EXPECT_TRUE(result3);
+}
+
+TEST_F(SnapshotUtilTest, IsAncestorOf) {
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto result1,
+      SnapshotUtil::IsAncestorOf(*table_, main1_snapshot_id_, 
base_snapshot_id_));
+  EXPECT_TRUE(result1);
+
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto result2,
+      SnapshotUtil::IsAncestorOf(*table_, branch_snapshot_id_, 
main1_snapshot_id_));
+  EXPECT_FALSE(result2);
+
+  // fork2 -> fork1 -> fork0 (expired, not in snapshots)
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto result3,
+      SnapshotUtil::IsAncestorOf(*table_, fork2_snapshot_id_, 
fork0_snapshot_id_));
+  EXPECT_FALSE(result3);  // fork0 is expired, so not found
+
+  // Test with current snapshot
+  ICEBERG_UNWRAP_OR_FAIL(auto result4,
+                         SnapshotUtil::IsAncestorOf(*table_, 
main1_snapshot_id_));
+  EXPECT_TRUE(result4);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result5,
+                         SnapshotUtil::IsAncestorOf(*table_, 
branch_snapshot_id_));
+  EXPECT_FALSE(result5);
+}
+
+TEST_F(SnapshotUtilTest, CurrentAncestors) {
+  ICEBERG_UNWRAP_OR_FAIL(auto ancestors, 
SnapshotUtil::CurrentAncestors(*table_));
+  auto ids = ExtractSnapshotIds(ancestors);
+  EXPECT_EQ(ids, std::vector<int64_t>(
+                     {main2_snapshot_id_, main1_snapshot_id_, 
base_snapshot_id_}));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto ancestor_ids, 
SnapshotUtil::CurrentAncestorIds(*table_));
+  EXPECT_EQ(ancestor_ids, std::vector<int64_t>({main2_snapshot_id_, 
main1_snapshot_id_,
+                                                base_snapshot_id_}));
+}
+
+TEST_F(SnapshotUtilTest, OldestAncestor) {
+  ICEBERG_UNWRAP_OR_FAIL(auto oldest, SnapshotUtil::OldestAncestor(*table_));
+  ASSERT_TRUE(oldest.has_value());
+  EXPECT_EQ(oldest.value()->snapshot_id, base_snapshot_id_);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto oldest_of_main2,
+                         SnapshotUtil::OldestAncestorOf(*table_, 
main2_snapshot_id_));
+  ASSERT_TRUE(oldest_of_main2.has_value());
+  EXPECT_EQ(oldest_of_main2.value()->snapshot_id, base_snapshot_id_);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto oldest_after,
+                         SnapshotUtil::OldestAncestorAfter(
+                             *table_, base_timestamp_ + 
std::chrono::milliseconds(1)));
+  ASSERT_TRUE(oldest_after.has_value());
+  EXPECT_EQ(oldest_after.value()->snapshot_id, main1_snapshot_id_);
+}
+
+TEST_F(SnapshotUtilTest, SnapshotsBetween) {
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto snapshot_ids,
+      SnapshotUtil::SnapshotIdsBetween(*table_, base_snapshot_id_, 
main2_snapshot_id_));
+  EXPECT_EQ(snapshot_ids, std::vector<int64_t>({main2_snapshot_id_, 
main1_snapshot_id_}));
+
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto ancestors_between1,
+      SnapshotUtil::AncestorsBetween(*table_, main2_snapshot_id_, 
main1_snapshot_id_));
+  auto ids1 = ExtractSnapshotIds(ancestors_between1);
+  EXPECT_EQ(ids1, std::vector<int64_t>({main2_snapshot_id_}));
+
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto ancestors_between2,
+      SnapshotUtil::AncestorsBetween(*table_, main2_snapshot_id_, 
branch_snapshot_id_));
+  auto ids2 = ExtractSnapshotIds(ancestors_between2);
+  EXPECT_EQ(ids2, std::vector<int64_t>(
+                      {main2_snapshot_id_, main1_snapshot_id_, 
base_snapshot_id_}));
+}
+
+TEST_F(SnapshotUtilTest, AncestorsOf) {
+  // Test ancestors of fork2: fork2 -> fork1 (fork0 is expired, not in 
snapshots)
+  ICEBERG_UNWRAP_OR_FAIL(auto ancestors,
+                         SnapshotUtil::AncestorsOf(*table_, 
fork2_snapshot_id_));
+  auto ids = ExtractSnapshotIds(ancestors);
+  EXPECT_EQ(ids, std::vector<int64_t>({fork2_snapshot_id_, 
fork1_snapshot_id_}));
+}
+
+TEST_F(SnapshotUtilTest, SchemaForRef) {
+  ICEBERG_UNWRAP_OR_FAIL(auto initial_schema, table_->schema());
+  ASSERT_NE(initial_schema, nullptr);
+
+  // Test with null/empty ref (main branch)
+  ICEBERG_UNWRAP_OR_FAIL(auto schema1, SnapshotUtil::SchemaFor(*table_, ""));
+  EXPECT_EQ(schema1->fields().size(), initial_schema->fields().size());
+
+  // Test with non-existing ref
+  ICEBERG_UNWRAP_OR_FAIL(auto schema2,
+                         SnapshotUtil::SchemaFor(*table_, "non-existing-ref"));
+  EXPECT_EQ(schema2->fields().size(), initial_schema->fields().size());
+
+  // Test with main branch
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto schema3,
+      SnapshotUtil::SchemaFor(*table_, std::string(SnapshotRef::kMainBranch)));
+  EXPECT_EQ(schema3->fields().size(), initial_schema->fields().size());
+}
+
+TEST_F(SnapshotUtilTest, SchemaForBranch) {
+  ICEBERG_UNWRAP_OR_FAIL(auto initial_schema, table_->schema());
+  ASSERT_NE(initial_schema, nullptr);
+
+  std::string branch = "b1";
+  ICEBERG_UNWRAP_OR_FAIL(auto schema, SnapshotUtil::SchemaFor(*table_, 
branch));
+  // Branch should return current schema (not snapshot schema)
+  EXPECT_EQ(schema->fields().size(), initial_schema->fields().size());
+}
+
+TEST_F(SnapshotUtilTest, SchemaForTag) {
+  // Create a tag pointing to base snapshot
+  auto metadata = table_->metadata();
+  std::string tag = "tag1";
+  metadata->refs[tag] = std::make_shared<SnapshotRef>(
+      SnapshotRef{.snapshot_id = base_snapshot_id_, .retention = 
SnapshotRef::Tag{}});
+
+  ICEBERG_UNWRAP_OR_FAIL(auto initial_schema, table_->schema());
+  ASSERT_NE(initial_schema, nullptr);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto schema, SnapshotUtil::SchemaFor(*table_, tag));
+  // Tag should return the schema of the snapshot it points to
+  // Since base snapshot has schema_id = 0, it should return the same schema
+  EXPECT_EQ(schema->fields().size(), initial_schema->fields().size());
+}
+
+TEST_F(SnapshotUtilTest, SnapshotAfter) {
+  ICEBERG_UNWRAP_OR_FAIL(auto snapshot_after,
+                         SnapshotUtil::SnapshotAfter(*table_, 
base_snapshot_id_));
+  EXPECT_EQ(snapshot_after->snapshot_id, main1_snapshot_id_);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto snapshot_after_main1,
+                         SnapshotUtil::SnapshotAfter(*table_, 
main1_snapshot_id_));
+  EXPECT_EQ(snapshot_after_main1->snapshot_id, main2_snapshot_id_);
+}
+
+TEST_F(SnapshotUtilTest, SnapshotIdAsOfTime) {
+  // Test with timestamp before any snapshot
+  auto early_timestamp = base_timestamp_ - std::chrono::milliseconds(1000);
+  auto snapshot_id = SnapshotUtil::OptionalSnapshotIdAsOfTime(*table_, 
early_timestamp);
+  EXPECT_FALSE(snapshot_id.has_value());
+
+  // Test with timestamp at base snapshot
+  auto snapshot_id1 = SnapshotUtil::OptionalSnapshotIdAsOfTime(*table_, 
base_timestamp_);
+  ASSERT_TRUE(snapshot_id1.has_value());
+  EXPECT_EQ(snapshot_id1.value(), base_snapshot_id_);
+
+  // Test with timestamp between main1 and main2
+  auto mid_timestamp = base_timestamp_ + std::chrono::milliseconds(1500);
+  ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id2,
+                         SnapshotUtil::SnapshotIdAsOfTime(*table_, 
mid_timestamp));
+  EXPECT_EQ(snapshot_id2, main1_snapshot_id_);
+}
+
+TEST_F(SnapshotUtilTest, LatestSnapshot) {
+  // Test main branch
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto main_snapshot,
+      SnapshotUtil::LatestSnapshot(*table_, 
std::string(SnapshotRef::kMainBranch)));
+  EXPECT_EQ(main_snapshot->snapshot_id, main2_snapshot_id_);
+
+  // Test branch
+  ICEBERG_UNWRAP_OR_FAIL(auto branch_snapshot,
+                         SnapshotUtil::LatestSnapshot(*table_, "b1"));
+  EXPECT_EQ(branch_snapshot->snapshot_id, branch_snapshot_id_);
+
+  // Test non-existing branch
+  ICEBERG_UNWRAP_OR_FAIL(auto non_existing,
+                         SnapshotUtil::LatestSnapshot(*table_, 
"non-existing"));
+  // Should return current snapshot
+  EXPECT_EQ(non_existing->snapshot_id, main2_snapshot_id_);
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/type.cc b/src/iceberg/type.cc
index 1cd5fb3e..44512c0d 100644
--- a/src/iceberg/type.cc
+++ b/src/iceberg/type.cc
@@ -141,9 +141,9 @@ StructType::InitFieldByLowerCaseName(const StructType& 
self) {
 }
 
 ListType::ListType(SchemaField element) : element_(std::move(element)) {
-  ICEBERG_CHECK(element_.name() == kElementName,
-                "ListType: child field name should be '{}', was '{}'", 
kElementName,
-                element_.name());
+  ICEBERG_CHECK_OR_DIE(element_.name() == kElementName,
+                       "ListType: child field name should be '{}', was '{}'",
+                       kElementName, element_.name());
 }
 
 ListType::ListType(int32_t field_id, std::shared_ptr<Type> type, bool optional)
@@ -200,12 +200,12 @@ bool ListType::Equals(const Type& other) const {
 
 MapType::MapType(SchemaField key, SchemaField value)
     : fields_{std::move(key), std::move(value)} {
-  ICEBERG_CHECK(this->key().name() == kKeyName,
-                "MapType: key field name should be '{}', was '{}'", kKeyName,
-                this->key().name());
-  ICEBERG_CHECK(this->value().name() == kValueName,
-                "MapType: value field name should be '{}', was '{}'", 
kValueName,
-                this->value().name());
+  ICEBERG_CHECK_OR_DIE(this->key().name() == kKeyName,
+                       "MapType: key field name should be '{}', was '{}'", 
kKeyName,
+                       this->key().name());
+  ICEBERG_CHECK_OR_DIE(this->value().name() == kValueName,
+                       "MapType: value field name should be '{}', was '{}'", 
kValueName,
+                       this->value().name());
 }
 
 const SchemaField& MapType::key() const { return fields_[0]; }
@@ -292,8 +292,8 @@ bool DoubleType::Equals(const Type& other) const { return 
other.type_id() == kTy
 
 DecimalType::DecimalType(int32_t precision, int32_t scale)
     : precision_(precision), scale_(scale) {
-  ICEBERG_CHECK(precision >= 0 && precision <= kMaxPrecision,
-                "DecimalType: precision must be in [0, 38], was {}", 
precision);
+  ICEBERG_CHECK_OR_DIE(precision >= 0 && precision <= kMaxPrecision,
+                       "DecimalType: precision must be in [0, 38], was {}", 
precision);
 }
 
 int32_t DecimalType::precision() const { return precision_; }
@@ -341,7 +341,7 @@ std::string UuidType::ToString() const { return "uuid"; }
 bool UuidType::Equals(const Type& other) const { return other.type_id() == 
kTypeId; }
 
 FixedType::FixedType(int32_t length) : length_(length) {
-  ICEBERG_CHECK(length >= 0, "FixedType: length must be >= 0, was {}", length);
+  ICEBERG_CHECK_OR_DIE(length >= 0, "FixedType: length must be >= 0, was {}", 
length);
 }
 
 int32_t FixedType::length() const { return length_; }
diff --git a/src/iceberg/util/decimal.cc b/src/iceberg/util/decimal.cc
index f33d9328..433d3586 100644
--- a/src/iceberg/util/decimal.cc
+++ b/src/iceberg/util/decimal.cc
@@ -300,8 +300,8 @@ bool RescaleWouldCauseDataLoss(const Decimal& value, 
int32_t delta_scale,
 
 Decimal::Decimal(std::string_view str) {
   auto result = Decimal::FromString(str);
-  ICEBERG_CHECK(result, "Failed to parse Decimal from string: {}, error: {}", 
str,
-                result.error().message);
+  ICEBERG_CHECK_OR_DIE(result, "Failed to parse Decimal from string: {}, 
error: {}", str,
+                       result.error().message);
   *this = std::move(result.value());
 }
 
diff --git a/src/iceberg/util/macros.h b/src/iceberg/util/macros.h
index 50ac13f2..c6919ab7 100644
--- a/src/iceberg/util/macros.h
+++ b/src/iceberg/util/macros.h
@@ -42,8 +42,25 @@
   ICEBERG_ASSIGN_OR_RAISE_IMPL(ICEBERG_ASSIGN_OR_RAISE_NAME(result_, 
__COUNTER__), lhs, \
                                rexpr)
 
+// Macro for debug checks
 #define ICEBERG_DCHECK(expr, message) assert((expr) && (message))
 
+// Macro for precondition checks, usually used for function arguments
+#define ICEBERG_PRECHECK(expr, ...)        \
+  do {                                     \
+    if (!(expr)) [[unlikely]] {            \
+      return InvalidArgument(__VA_ARGS__); \
+    }                                      \
+  } while (0)
+
+// Macro for state checks, usually used for unexpected states
+#define ICEBERG_CHECK(expr, ...)   \
+  do {                             \
+    if (!(expr)) [[unlikely]] {    \
+      return Invalid(__VA_ARGS__); \
+    }                              \
+  } while (0)
+
 #define ERROR_TO_EXCEPTION(error)                             \
   if (error.kind == iceberg::ErrorKind::kInvalidExpression) { \
     throw iceberg::ExpressionError(error.message);            \
diff --git a/src/iceberg/util/snapshot_util.cc 
b/src/iceberg/util/snapshot_util.cc
new file mode 100644
index 00000000..1243a109
--- /dev/null
+++ b/src/iceberg/util/snapshot_util.cc
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <ranges>
+
+#include "iceberg/schema.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/util/macros.h"
+#include "iceberg/util/snapshot_util_internal.h"
+#include "iceberg/util/timepoint.h"
+
+namespace iceberg {
+
+// Shorthand to return for a NotFound error.
+#define ICEBERG_ACTION_FOR_NOT_FOUND(result, action)   \
+  if (!result.has_value()) [[unlikely]] {              \
+    if (result.error().kind == ErrorKind::kNotFound) { \
+      action;                                          \
+    }                                                  \
+    return std::unexpected<Error>(result.error());     \
+  }
+
+Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::AncestorsOf(
+    const Table& table, int64_t snapshot_id) {
+  return table.SnapshotById(snapshot_id).and_then([&table](const auto& 
snapshot) {
+    return AncestorsOf(table, snapshot);
+  });
+}
+
+Result<bool> SnapshotUtil::IsAncestorOf(const Table& table, int64_t 
snapshot_id,
+                                        int64_t ancestor_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
+  return std::ranges::any_of(ancestors, [ancestor_snapshot_id](const auto& 
snapshot) {
+    return snapshot != nullptr && snapshot->snapshot_id == 
ancestor_snapshot_id;
+  });
+}
+
+Result<bool> SnapshotUtil::IsAncestorOf(const Table& table,
+                                        int64_t ancestor_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot());
+  ICEBERG_CHECK(current != nullptr, "Current snapshot is null");
+  return IsAncestorOf(table, current->snapshot_id, ancestor_snapshot_id);
+}
+
+Result<bool> SnapshotUtil::IsParentAncestorOf(const Table& table, int64_t 
snapshot_id,
+                                              int64_t 
ancestor_parent_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
+  return std::ranges::any_of(
+      ancestors, [ancestor_parent_snapshot_id](const auto& snapshot) {
+        return snapshot != nullptr && snapshot->parent_snapshot_id.has_value() 
&&
+               snapshot->parent_snapshot_id.value() == 
ancestor_parent_snapshot_id;
+      });
+}
+
+Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::CurrentAncestors(
+    const Table& table) {
+  auto current_result = table.current_snapshot();
+  ICEBERG_ACTION_FOR_NOT_FOUND(current_result, return {});
+  return AncestorsOf(table, current_result.value());
+}
+
+Result<std::vector<int64_t>> SnapshotUtil::CurrentAncestorIds(const Table& 
table) {
+  return CurrentAncestors(table).and_then(ToIds);
+}
+
+Result<std::optional<std::shared_ptr<Snapshot>>> SnapshotUtil::OldestAncestor(
+    const Table& table) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, CurrentAncestors(table));
+  if (ancestors.empty()) {
+    return std::nullopt;
+  }
+  return ancestors.back();
+}
+
+Result<std::optional<std::shared_ptr<Snapshot>>> 
SnapshotUtil::OldestAncestorOf(
+    const Table& table, int64_t snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
+  if (ancestors.empty()) {
+    return std::nullopt;
+  }
+  return ancestors.back();
+}
+
+Result<std::optional<std::shared_ptr<Snapshot>>> 
SnapshotUtil::OldestAncestorAfter(
+    const Table& table, TimePointMs timestamp_ms) {
+  auto current_result = table.current_snapshot();
+  ICEBERG_ACTION_FOR_NOT_FOUND(current_result, { return std::nullopt; });
+  auto current = std::move(current_result.value());
+
+  std::optional<std::shared_ptr<Snapshot>> last_snapshot = std::nullopt;
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, current));
+  for (const auto& snapshot : ancestors) {
+    auto snapshot_timestamp_ms = snapshot->timestamp_ms;
+    if (snapshot_timestamp_ms < timestamp_ms) {
+      return last_snapshot;
+    } else if (snapshot_timestamp_ms == timestamp_ms) {
+      return snapshot;
+    }
+    last_snapshot = std::move(snapshot);
+  }
+
+  if (last_snapshot.has_value() && last_snapshot.value() != nullptr &&
+      !last_snapshot.value()->parent_snapshot_id.has_value()) {
+    // this is the first snapshot in the table, return it
+    return last_snapshot;
+  }
+
+  // the first ancestor after the given time can't be determined
+  return NotFound("Cannot find snapshot older than {}", 
FormatTimestamp(timestamp_ms));
+}
+
+Result<std::vector<int64_t>> SnapshotUtil::SnapshotIdsBetween(const Table& 
table,
+                                                              int64_t 
from_snapshot_id,
+                                                              int64_t 
to_snapshot_id) {
+  // Create a lookup function that returns null when snapshot_id equals 
from_snapshot_id.
+  // This effectively stops traversal at from_snapshot_id (exclusive)
+  auto lookup = [&table,
+                 from_snapshot_id](int64_t id) -> 
Result<std::shared_ptr<Snapshot>> {
+    if (id == from_snapshot_id) {
+      return nullptr;
+    }
+    return table.SnapshotById(id);
+  };
+
+  return table.SnapshotById(to_snapshot_id)
+      .and_then(
+          [&lookup](const auto& to_snapshot) { return AncestorsOf(to_snapshot, 
lookup); })
+      .and_then(ToIds);
+}
+
+Result<std::vector<int64_t>> SnapshotUtil::AncestorIdsBetween(
+    const Table& table, int64_t latest_snapshot_id,
+    const std::optional<int64_t>& oldest_snapshot_id) {
+  return AncestorsBetween(table, latest_snapshot_id, 
oldest_snapshot_id).and_then(ToIds);
+}
+
+Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::AncestorsBetween(
+    const Table& table, int64_t latest_snapshot_id,
+    std::optional<int64_t> oldest_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto start, table.SnapshotById(latest_snapshot_id));
+
+  if (oldest_snapshot_id.has_value()) {
+    if (latest_snapshot_id == oldest_snapshot_id.value()) {
+      return {};
+    }
+
+    return AncestorsOf(start,
+                       [&table, oldest_snapshot_id = 
oldest_snapshot_id.value()](
+                           int64_t id) -> Result<std::shared_ptr<Snapshot>> {
+                         if (id == oldest_snapshot_id) {
+                           return nullptr;
+                         }
+                         return table.SnapshotById(id);
+                       });
+  } else {
+    return AncestorsOf(table, start);
+  }
+}
+
+Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::AncestorsOf(
+    const Table& table, const std::shared_ptr<Snapshot>& snapshot) {
+  return AncestorsOf(snapshot, [&table](int64_t id) { return 
table.SnapshotById(id); });
+}
+
+Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::AncestorsOf(
+    const std::shared_ptr<Snapshot>& snapshot,
+    const std::function<Result<std::shared_ptr<Snapshot>>(int64_t)>& lookup) {
+  ICEBERG_PRECHECK(snapshot != nullptr, "Snapshot is null");
+
+  std::shared_ptr<Snapshot> current = snapshot;
+  std::vector<std::shared_ptr<Snapshot>> result;
+
+  while (current != nullptr) {
+    result.push_back(current);
+    if (!current->parent_snapshot_id.has_value()) {
+      break;
+    }
+    auto parent_result = lookup(current->parent_snapshot_id.value());
+    ICEBERG_ACTION_FOR_NOT_FOUND(parent_result, { break; });
+    current = std::move(parent_result.value());
+  }
+
+  return result;
+}
+
+Result<std::vector<int64_t>> SnapshotUtil::ToIds(
+    const std::vector<std::shared_ptr<Snapshot>>& snapshots) {
+  return snapshots |
+         std::views::filter([](const auto& snapshot) { return snapshot != 
nullptr; }) |
+         std::views::transform(
+             [](const auto& snapshot) { return snapshot->snapshot_id; }) |
+         std::ranges::to<std::vector<int64_t>>();
+}
+
+Result<std::shared_ptr<Snapshot>> SnapshotUtil::SnapshotAfter(const Table& 
table,
+                                                              int64_t 
snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto parent, table.SnapshotById(snapshot_id));
+  ICEBERG_CHECK(parent != nullptr, "Snapshot is null for id {}", snapshot_id);
+
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, CurrentAncestors(table));
+  for (const auto& current : ancestors) {
+    if (current != nullptr && current->parent_snapshot_id.has_value() &&
+        current->parent_snapshot_id.value() == snapshot_id) {
+      return current;
+    }
+  }
+
+  return NotFound(
+      "Cannot find snapshot after {}: not an ancestor of table's current 
snapshot",
+      snapshot_id);
+}
+
+Result<int64_t> SnapshotUtil::SnapshotIdAsOfTime(const Table& table,
+                                                 TimePointMs timestamp_ms) {
+  auto snapshot_id = OptionalSnapshotIdAsOfTime(table, timestamp_ms);
+  ICEBERG_CHECK(snapshot_id.has_value(), "Cannot find a snapshot older than 
{}",
+                FormatTimestamp(timestamp_ms));
+  return snapshot_id.value();
+}
+
+std::optional<int64_t> SnapshotUtil::OptionalSnapshotIdAsOfTime(
+    const Table& table, TimePointMs timestamp_ms) {
+  std::optional<int64_t> snapshot_id = std::nullopt;
+  for (const auto& log_entry : table.history()) {
+    if (log_entry.timestamp_ms <= timestamp_ms) {
+      snapshot_id = log_entry.snapshot_id;
+    }
+  }
+  return snapshot_id;
+}
+
+Result<std::shared_ptr<Schema>> SnapshotUtil::SchemaFor(const Table& table,
+                                                        int64_t snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto snapshot, table.SnapshotById(snapshot_id));
+  ICEBERG_CHECK(snapshot, "Snapshot is null for id {}", snapshot_id);
+
+  if (snapshot->schema_id.has_value()) {
+    return table.metadata()->SchemaById(snapshot->schema_id.value());
+  }
+
+  // TODO(any): recover the schema by reading previous metadata files
+  return table.schema();
+}
+
+Result<std::shared_ptr<Schema>> SnapshotUtil::SchemaFor(const Table& table,
+                                                        TimePointMs 
timestamp_ms) {
+  return SnapshotIdAsOfTime(table, timestamp_ms).and_then([&table](int64_t id) 
{
+    return SchemaFor(table, id);
+  });
+}
+
+Result<std::shared_ptr<Schema>> SnapshotUtil::SchemaFor(const Table& table,
+                                                        const std::string& 
ref) {
+  if (ref.empty() || ref == SnapshotRef::kMainBranch) {
+    return table.schema();
+  }
+
+  const auto& metadata = table.metadata();
+  auto it = metadata->refs.find(ref);
+  if (it == metadata->refs.cend() || it->second->type() == 
SnapshotRefType::kBranch) {
+    return table.schema();
+  }
+
+  return SchemaFor(table, it->second->snapshot_id);
+}
+
+Result<std::shared_ptr<Schema>> SnapshotUtil::SchemaFor(const TableMetadata& 
metadata,
+                                                        const std::string& 
ref) {
+  if (ref.empty() || ref == SnapshotRef::kMainBranch) {
+    return metadata.Schema();
+  }
+
+  auto it = metadata.refs.find(ref);
+  if (it == metadata.refs.end() || it->second->type() == 
SnapshotRefType::kBranch) {
+    return metadata.Schema();
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(auto snapshot, 
metadata.SnapshotById(it->second->snapshot_id));
+  if (!snapshot->schema_id.has_value()) {
+    return metadata.Schema();
+  }
+
+  return metadata.SchemaById(snapshot->schema_id);
+}
+
+Result<std::shared_ptr<Snapshot>> SnapshotUtil::LatestSnapshot(
+    const Table& table, const std::string& branch) {
+  return LatestSnapshot(*table.metadata(), branch);
+}
+
+Result<std::shared_ptr<Snapshot>> SnapshotUtil::LatestSnapshot(
+    const TableMetadata& metadata, const std::string& branch) {
+  if (branch.empty() || branch == SnapshotRef::kMainBranch) {
+    return metadata.Snapshot();
+  }
+
+  auto it = metadata.refs.find(branch);
+  if (it == metadata.refs.end()) {
+    return metadata.Snapshot();
+  }
+
+  return metadata.SnapshotById(it->second->snapshot_id);
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/util/snapshot_util_internal.h 
b/src/iceberg/util/snapshot_util_internal.h
new file mode 100644
index 00000000..e0d8830f
--- /dev/null
+++ b/src/iceberg/util/snapshot_util_internal.h
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <functional>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include "iceberg/iceberg_export.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+#include "iceberg/util/timepoint.h"
+
+namespace iceberg {
+
+/// \brief Utility functions for working with snapshots
+/// \note All the returned std::shared_ptr<Snapshot> are guaranteed to be not 
null.
+class ICEBERG_EXPORT SnapshotUtil {
+ public:
+  /// \brief Returns a vector of ancestors of the given snapshot.
+  ///
+  /// \param table The table
+  /// \param snapshot_id The snapshot ID to start from
+  /// \return A vector of ancestor snapshots
+  static Result<std::vector<std::shared_ptr<Snapshot>>> AncestorsOf(const 
Table& table,
+                                                                    int64_t 
snapshot_id);
+
+  /// \brief Returns whether ancestor_snapshot_id is an ancestor of 
snapshot_id.
+  ///
+  /// \param table The table to check
+  /// \param snapshot_id The snapshot ID to check
+  /// \param ancestor_snapshot_id The ancestor snapshot ID to check for
+  /// \return true if ancestor_snapshot_id is an ancestor of snapshot_id
+  static Result<bool> IsAncestorOf(const Table& table, int64_t snapshot_id,
+                                   int64_t ancestor_snapshot_id);
+
+  /// \brief Returns whether ancestor_snapshot_id is an ancestor of the 
table's current
+  /// state.
+  ///
+  /// \param table The table to check
+  /// \param ancestor_snapshot_id The ancestor snapshot ID to check for
+  /// \return true if ancestor_snapshot_id is an ancestor of the current 
snapshot
+  static Result<bool> IsAncestorOf(const Table& table, int64_t 
ancestor_snapshot_id);
+
+  /// \brief Returns whether some ancestor of snapshot_id has parentId matches
+  /// ancestor_parent_snapshot_id.
+  ///
+  /// \param table The table to check
+  /// \param snapshot_id The snapshot ID to check
+  /// \param ancestor_parent_snapshot_id The ancestor parent snapshot ID to 
check for
+  /// \return true if any ancestor has the given parent ID
+  static Result<bool> IsParentAncestorOf(const Table& table, int64_t 
snapshot_id,
+                                         int64_t ancestor_parent_snapshot_id);
+
+  /// \brief Returns a vector that traverses the table's snapshots from the 
current to the
+  /// last known ancestor.
+  ///
+  /// \param table The table
+  /// \return A vector from the table's current snapshot to its last known 
ancestor
+  static Result<std::vector<std::shared_ptr<Snapshot>>> CurrentAncestors(
+      const Table& table);
+
+  /// \brief Returns the snapshot IDs for the ancestors of the current table 
state.
+  ///
+  /// Ancestor IDs are ordered by commit time, descending. The first ID is the 
current
+  /// snapshot, followed by its parent, and so on.
+  ///
+  /// \param table The table
+  /// \return A vector of snapshot IDs of the known ancestor snapshots, 
including the
+  /// current ID
+  static Result<std::vector<int64_t>> CurrentAncestorIds(const Table& table);
+
+  /// \brief Traverses the history of the table's current snapshot and finds 
the oldest
+  /// Snapshot.
+  ///
+  /// \param table The table
+  /// \return The oldest snapshot, or nullopt if there is no current snapshot
+  static Result<std::optional<std::shared_ptr<Snapshot>>> OldestAncestor(
+      const Table& table);
+
+  /// \brief Traverses the history and finds the oldest ancestor of the 
specified
+  /// snapshot.
+  ///
+  /// Oldest ancestor is defined as the ancestor snapshot whose parent is null 
or has been
+  /// expired. If the specified snapshot has no parent or parent has been 
expired, the
+  /// specified snapshot itself is returned.
+  ///
+  /// \param table The table
+  /// \param snapshot_id The ID of the snapshot to find the oldest ancestor
+  /// \return The oldest snapshot, or nullopt if not found
+  static Result<std::optional<std::shared_ptr<Snapshot>>> OldestAncestorOf(
+      const Table& table, int64_t snapshot_id);
+
+  /// \brief Traverses the history of the table's current snapshot, finds the 
oldest
+  /// snapshot that was committed either at or after a given time.
+  ///
+  /// \param table The table
+  /// \param timestamp_ms A timestamp in milliseconds
+  /// \return The first snapshot after the given timestamp, or nullopt if the 
current
+  /// snapshot is older than the timestamp. If the first ancestor after the 
given time
+  /// can't be determined, returns a NotFound error.
+  static Result<std::optional<std::shared_ptr<Snapshot>>> OldestAncestorAfter(
+      const Table& table, TimePointMs timestamp_ms);
+
+  /// \brief Returns list of snapshot ids in the range 
(from_snapshot_id,to_snapshot_id]
+  ///
+  /// This method assumes that from_snapshot_id is an ancestor of 
to_snapshot_id.
+  ///
+  /// \param table The table
+  /// \param from_snapshot_id The starting snapshot ID (exclusive)
+  /// \param to_snapshot_id The ending snapshot ID (inclusive)
+  /// \return A vector of snapshot IDs in the range
+  static Result<std::vector<int64_t>> SnapshotIdsBetween(const Table& table,
+                                                         int64_t 
from_snapshot_id,
+                                                         int64_t 
to_snapshot_id);
+
+  /// \brief Returns a vector of ancestor IDs between two snapshots.
+  ///
+  /// \param table The table
+  /// \param latest_snapshot_id The latest snapshot ID
+  /// \param oldest_snapshot_id The oldest snapshot ID (optional, nullopt 
means all
+  /// ancestors)
+  /// \return A vector of snapshot IDs between the two snapshots
+  static Result<std::vector<int64_t>> AncestorIdsBetween(
+      const Table& table, int64_t latest_snapshot_id,
+      const std::optional<int64_t>& oldest_snapshot_id);
+
+  /// \brief Returns a vector of ancestors between two snapshots.
+  ///
+  /// \param table The table
+  /// \param latest_snapshot_id The latest snapshot ID
+  /// \param oldest_snapshot_id The oldest snapshot ID (optional, nullopt 
means all
+  /// ancestors)
+  /// \return A vector of ancestor snapshots between the two snapshots
+  static Result<std::vector<std::shared_ptr<Snapshot>>> AncestorsBetween(
+      const Table& table, int64_t latest_snapshot_id,
+      std::optional<int64_t> oldest_snapshot_id);
+
+  /// \brief Traverses the history of the table's current snapshot and finds 
the snapshot
+  /// with the given snapshot id as its parent.
+  ///
+  /// \param table The table
+  /// \param snapshot_id The parent snapshot ID
+  /// \return The snapshot for which the given snapshot is the parent
+  static Result<std::shared_ptr<Snapshot>> SnapshotAfter(const Table& table,
+                                                         int64_t snapshot_id);
+
+  /// \brief Returns the ID of the most recent snapshot for the table as of 
the timestamp.
+  ///
+  /// \param table The table
+  /// \param timestamp_ms The timestamp in millis since the Unix epoch
+  /// \return The snapshot ID
+  static Result<int64_t> SnapshotIdAsOfTime(const Table& table, TimePointMs 
timestamp_ms);
+
+  /// \brief Returns the ID of the most recent snapshot for the table as of 
the timestamp,
+  /// or nullopt if not found.
+  ///
+  /// \param table The table
+  /// \param timestamp_ms The timestamp in millis since the Unix epoch
+  /// \return The snapshot ID, or nullopt if not found
+  static std::optional<int64_t> OptionalSnapshotIdAsOfTime(const Table& table,
+                                                           TimePointMs 
timestamp_ms);
+
+  /// \brief Returns the schema of the table for the specified snapshot.
+  ///
+  /// \param table The table
+  /// \param snapshot_id The ID of the snapshot
+  /// \return The schema
+  static Result<std::shared_ptr<Schema>> SchemaFor(const Table& table,
+                                                   int64_t snapshot_id);
+
+  /// \brief Returns the schema of the table for the specified timestamp.
+  ///
+  /// \param table The table
+  /// \param timestamp_ms The timestamp in millis since the Unix epoch
+  /// \return The schema
+  static Result<std::shared_ptr<Schema>> SchemaFor(const Table& table,
+                                                   TimePointMs timestamp_ms);
+
+  /// \brief Return the schema of the snapshot at a given ref.
+  ///
+  /// If the ref does not exist or the ref is a branch, the table schema is 
returned
+  /// because it will be the schema when the new branch is created. If the ref 
is a tag,
+  /// then the snapshot schema is returned.
+  ///
+  /// \param table The table
+  /// \param ref Ref name of the table (empty string means main branch)
+  /// \return Schema of the specific snapshot at the given ref
+  static Result<std::shared_ptr<Schema>> SchemaFor(const Table& table,
+                                                   const std::string& ref);
+
+  /// \brief Return the schema of the snapshot at a given ref.
+  ///
+  /// If the ref does not exist or the ref is a branch, the table schema is 
returned
+  /// because it will be the schema when the new branch is created. If the ref 
is a tag,
+  /// then the snapshot schema is returned.
+  ///
+  /// \param metadata The table metadata
+  /// \param ref Ref name of the table (empty string means main branch)
+  /// \return Schema of the specific snapshot at the given branch
+  static Result<std::shared_ptr<Schema>> SchemaFor(const TableMetadata& 
metadata,
+                                                   const std::string& ref);
+
+  /// \brief Fetch the snapshot at the head of the given branch in the given 
table.
+  ///
+  /// This method calls Table::current_snapshot() instead of using branch API 
for the main
+  /// branch so that existing code still goes through the old code path to 
ensure
+  /// backwards compatibility.
+  ///
+  /// \param table The table
+  /// \param branch Branch name of the table (empty string means main branch)
+  /// \return The latest snapshot for the given branch
+  static Result<std::shared_ptr<Snapshot>> LatestSnapshot(const Table& table,
+                                                          const std::string& 
branch);
+
+  /// \brief Fetch the snapshot at the head of the given branch in the given 
table.
+  ///
+  /// This method calls TableMetadata::Snapshot() instead of using branch API 
for the main
+  /// branch so that existing code still goes through the old code path to 
ensure
+  /// backwards compatibility.
+  ///
+  /// If branch does not exist, the table's latest snapshot is returned it 
will be the
+  /// schema when the new branch is created.
+  ///
+  /// \param metadata The table metadata
+  /// \param branch Branch name of the table metadata (empty string means main
+  /// branch)
+  /// \return The latest snapshot for the given branch
+  static Result<std::shared_ptr<Snapshot>> LatestSnapshot(const TableMetadata& 
metadata,
+                                                          const std::string& 
branch);
+
+ private:
+  /// \brief Helper function to traverse ancestors of a snapshot.
+  ///
+  /// \param table The table
+  /// \param snapshot The snapshot to start from
+  /// \return A vector of ancestor snapshots
+  static Result<std::vector<std::shared_ptr<Snapshot>>> AncestorsOf(
+      const Table& table, const std::shared_ptr<Snapshot>& snapshot);
+
+  /// \brief Helper function to traverse ancestors of a snapshot using a 
lookup function.
+  ///
+  /// \param snapshot The snapshot to start from
+  /// \param lookup Function to lookup snapshots by ID
+  /// \return A vector of ancestor snapshots
+  static Result<std::vector<std::shared_ptr<Snapshot>>> AncestorsOf(
+      const std::shared_ptr<Snapshot>& snapshot,
+      const std::function<Result<std::shared_ptr<Snapshot>>(int64_t)>& lookup);
+
+  /// \brief Helper function to convert snapshots to IDs.
+  ///
+  /// \param snapshots The snapshots
+  /// \return A vector of snapshot IDs
+  static Result<std::vector<int64_t>> ToIds(
+      const std::vector<std::shared_ptr<Snapshot>>& snapshots);
+};
+
+}  // namespace iceberg
diff --git a/src/iceberg/util/timepoint.cc b/src/iceberg/util/timepoint.cc
index 6438e8e9..a8bc7708 100644
--- a/src/iceberg/util/timepoint.cc
+++ b/src/iceberg/util/timepoint.cc
@@ -20,6 +20,8 @@
 #include "iceberg/util/timepoint.h"
 
 #include <chrono>
+#include <iomanip>
+#include <sstream>
 
 namespace iceberg {
 
@@ -43,4 +45,22 @@ int64_t UnixNsFromTimePointNs(TimePointNs time_point_ns) {
       .count();
 }
 
+std::string FormatTimestamp(TimePointMs time_point_ns) {
+  // Convert TimePointMs to system_clock::time_point
+  auto unix_ms = UnixMsFromTimePointMs(time_point_ns);
+  auto time_point =
+      
std::chrono::system_clock::time_point(std::chrono::milliseconds(unix_ms));
+  auto time_t = std::chrono::system_clock::to_time_t(time_point);
+
+  // Format as ISO 8601-like string: YYYY-MM-DD HH:MM:SS
+  std::ostringstream oss;
+  oss << std::put_time(std::gmtime(&time_t), "%Y-%m-%d %H:%M:%S");
+
+  // Add milliseconds
+  auto ms = unix_ms % 1000;
+  oss << "." << std::setfill('0') << std::setw(3) << ms << " UTC";
+
+  return oss.str();
+}
+
 }  // namespace iceberg
diff --git a/src/iceberg/util/timepoint.h b/src/iceberg/util/timepoint.h
index 53857875..48e630ae 100644
--- a/src/iceberg/util/timepoint.h
+++ b/src/iceberg/util/timepoint.h
@@ -46,4 +46,7 @@ ICEBERG_EXPORT Result<TimePointNs> 
TimePointNsFromUnixNs(int64_t unix_ns);
 /// \brief Returns a Unix timestamp in nanoseconds from a TimePointNs
 ICEBERG_EXPORT int64_t UnixNsFromTimePointNs(TimePointNs time_point_ns);
 
+/// \brief Returns a human-readable string representation of a TimePointMs
+ICEBERG_EXPORT std::string FormatTimestamp(TimePointMs time_point_ns);
+
 }  // namespace iceberg
diff --git a/src/iceberg/util/uuid.cc b/src/iceberg/util/uuid.cc
index 14256755..9322deb9 100644
--- a/src/iceberg/util/uuid.cc
+++ b/src/iceberg/util/uuid.cc
@@ -204,7 +204,7 @@ Result<Uuid> Uuid::FromBytes(std::span<const uint8_t> 
bytes) {
 }
 
 uint8_t Uuid::operator[](size_t index) const {
-  ICEBERG_CHECK(index < kLength, "UUID index out of range: {}", index);
+  ICEBERG_CHECK_OR_DIE(index < kLength, "UUID index out of range: {}", index);
   return data_[index];
 }
 

Reply via email to