This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new a29355dd feat: add snapshot update (#408)
a29355dd is described below
commit a29355ddb904cc75aa8e04a591797f8f163b85a0
Author: Junwang Zhao <[email protected]>
AuthorDate: Tue Jan 13 13:45:56 2026 +0800
feat: add snapshot update (#408)
---
src/iceberg/CMakeLists.txt | 1 +
src/iceberg/avro/avro_schema_util.cc | 10 +-
src/iceberg/json_internal.cc | 16 +-
src/iceberg/manifest/manifest_writer.cc | 11 +-
src/iceberg/manifest/manifest_writer.h | 3 +-
src/iceberg/meson.build | 1 +
src/iceberg/snapshot.cc | 103 ++++++-
src/iceberg/snapshot.h | 74 ++++-
src/iceberg/table.h | 20 +-
src/iceberg/table_metadata.cc | 286 +++++++++++++++++-
src/iceberg/table_metadata.h | 25 +-
src/iceberg/table_properties.h | 4 +-
src/iceberg/table_update.cc | 4 +-
src/iceberg/table_update.h | 13 +
src/iceberg/test/json_internal_test.cc | 2 +-
src/iceberg/test/snapshot_test.cc | 2 +-
src/iceberg/test/table_requirements_test.cc | 12 +-
src/iceberg/test/update_properties_test.cc | 2 +-
src/iceberg/transaction.cc | 66 ++++-
src/iceberg/transaction.h | 10 +-
src/iceberg/type_fwd.h | 1 +
src/iceberg/update/meson.build | 1 +
src/iceberg/update/pending_update.cc | 6 +
src/iceberg/update/pending_update.h | 14 +-
src/iceberg/update/snapshot_update.cc | 434 ++++++++++++++++++++++++++++
src/iceberg/update/snapshot_update.h | 196 +++++++++++++
src/iceberg/update/update_partition_spec.cc | 17 +-
src/iceberg/update/update_properties.cc | 18 +-
src/iceberg/update/update_schema.cc | 6 +-
src/iceberg/update/update_sort_order.cc | 5 +-
src/iceberg/util/snapshot_util.cc | 25 ++
src/iceberg/util/snapshot_util_internal.h | 28 +-
src/iceberg/util/string_util.h | 18 ++
src/iceberg/util/timepoint.cc | 7 +
src/iceberg/util/timepoint.h | 3 +
src/iceberg/util/uuid.cc | 12 +
src/iceberg/util/uuid.h | 3 +
37 files changed, 1362 insertions(+), 97 deletions(-)
diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index 3e0c66f9..23d2e4cd 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -82,6 +82,7 @@ set(ICEBERG_SOURCES
transform_function.cc
type.cc
update/pending_update.cc
+ update/snapshot_update.cc
update/update_partition_spec.cc
update/update_properties.cc
update/update_schema.cc
diff --git a/src/iceberg/avro/avro_schema_util.cc
b/src/iceberg/avro/avro_schema_util.cc
index ba38c1f9..b19ffce7 100644
--- a/src/iceberg/avro/avro_schema_util.cc
+++ b/src/iceberg/avro/avro_schema_util.cc
@@ -17,7 +17,6 @@
* under the License.
*/
-#include <charconv>
#include <format>
#include <mutex>
#include <sstream>
@@ -40,6 +39,7 @@
#include "iceberg/schema_util_internal.h"
#include "iceberg/util/formatter.h"
#include "iceberg/util/macros.h"
+#include "iceberg/util/string_util.h"
#include "iceberg/util/visit_type.h"
namespace iceberg::avro {
@@ -471,13 +471,7 @@ Result<int32_t> GetId(const ::avro::NodePtr& node, const
std::string& attr_name,
return InvalidSchema("Missing avro attribute: {}", attr_name);
}
- int32_t id;
- const auto& id_value = id_str.value();
- auto [_, ec] = std::from_chars(id_value.data(), id_value.data() +
id_value.size(), id);
- if (ec != std::errc()) {
- return InvalidSchema("Invalid {}: {}", attr_name, id_value);
- }
- return id;
+ return StringUtils::ParseInt<int32_t>(id_str.value());
}
Result<int32_t> GetElementId(const ::avro::NodePtr& node) {
diff --git a/src/iceberg/json_internal.cc b/src/iceberg/json_internal.cc
index 669d0b50..4cd2f03f 100644
--- a/src/iceberg/json_internal.cc
+++ b/src/iceberg/json_internal.cc
@@ -399,7 +399,7 @@ nlohmann::json ToJson(const Snapshot& snapshot) {
json[kTimestampMs] = UnixMsFromTimePointMs(snapshot.timestamp_ms);
json[kManifestList] = snapshot.manifest_list;
// If there is an operation, write the summary map
- if (snapshot.operation().has_value()) {
+ if (snapshot.Operation().has_value()) {
json[kSummary] = snapshot.summary;
}
SetOptionalField(json, kSchemaId, snapshot.schema_id);
@@ -1553,9 +1553,17 @@ Result<std::unique_ptr<TableUpdate>>
TableUpdateFromJson(const nlohmann::json& j
GetJsonValueOptional<int64_t>(json,
kMaxSnapshotAgeMs));
ICEBERG_ASSIGN_OR_RAISE(auto max_ref_age,
GetJsonValueOptional<int64_t>(json, kMaxRefAgeMs));
- return std::make_unique<table::SetSnapshotRef>(std::move(ref_name),
snapshot_id, type,
- min_snapshots,
max_snapshot_age,
- max_ref_age);
+ if (type == SnapshotRefType::kTag) {
+ ICEBERG_ASSIGN_OR_RAISE(auto tag, SnapshotRef::MakeTag(snapshot_id,
max_ref_age));
+ return std::make_unique<table::SetSnapshotRef>(std::move(ref_name),
*tag);
+ } else {
+ ICEBERG_CHECK(type == SnapshotRefType::kBranch,
+ "Expected branch type for snapshot ref");
+ ICEBERG_ASSIGN_OR_RAISE(auto branch,
+ SnapshotRef::MakeBranch(snapshot_id,
min_snapshots,
+ max_snapshot_age,
max_ref_age));
+ return std::make_unique<table::SetSnapshotRef>(std::move(ref_name),
*branch);
+ }
}
if (action == kActionSetProperties) {
using StringMap = std::unordered_map<std::string, std::string>;
diff --git a/src/iceberg/manifest/manifest_writer.cc
b/src/iceberg/manifest/manifest_writer.cc
index 0899869a..0045e2c0 100644
--- a/src/iceberg/manifest/manifest_writer.cc
+++ b/src/iceberg/manifest/manifest_writer.cc
@@ -369,23 +369,18 @@ Result<std::unique_ptr<ManifestWriter>>
ManifestWriter::MakeWriter(
int8_t format_version, std::optional<int64_t> snapshot_id,
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
std::shared_ptr<PartitionSpec> partition_spec, std::shared_ptr<Schema>
current_schema,
- std::optional<ManifestContent> content, std::optional<int64_t>
first_row_id) {
+ ManifestContent content, std::optional<int64_t> first_row_id) {
switch (format_version) {
case 1:
return MakeV1Writer(snapshot_id, manifest_location, std::move(file_io),
std::move(partition_spec),
std::move(current_schema));
case 2:
- ICEBERG_PRECHECK(content.has_value(),
- "ManifestContent is required for format version 2");
return MakeV2Writer(snapshot_id, manifest_location, std::move(file_io),
- std::move(partition_spec), std::move(current_schema),
- content.value());
+ std::move(partition_spec),
std::move(current_schema), content);
case 3:
- ICEBERG_PRECHECK(content.has_value(),
- "ManifestContent is required for format version 3");
return MakeV3Writer(snapshot_id, first_row_id, manifest_location,
std::move(file_io), std::move(partition_spec),
- std::move(current_schema), content.value());
+ std::move(current_schema), content);
default:
return NotSupported("Format version {} is not supported",
format_version);
}
diff --git a/src/iceberg/manifest/manifest_writer.h
b/src/iceberg/manifest/manifest_writer.h
index 5a095b28..288bda31 100644
--- a/src/iceberg/manifest/manifest_writer.h
+++ b/src/iceberg/manifest/manifest_writer.h
@@ -28,6 +28,7 @@
#include "iceberg/file_writer.h"
#include "iceberg/iceberg_export.h"
+#include "iceberg/manifest/manifest_list.h"
#include "iceberg/metrics.h"
#include "iceberg/result.h"
#include "iceberg/type_fwd.h"
@@ -175,7 +176,7 @@ class ICEBERG_EXPORT ManifestWriter {
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
std::shared_ptr<PartitionSpec> partition_spec,
std::shared_ptr<Schema> current_schema,
- std::optional<ManifestContent> content = std::nullopt,
+ ManifestContent content = ManifestContent::kData,
std::optional<int64_t> first_row_id = std::nullopt);
private:
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index 3164f390..ead2ef2c 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -103,6 +103,7 @@ iceberg_sources = files(
'transform_function.cc',
'type.cc',
'update/pending_update.cc',
+ 'update/snapshot_update.cc',
'update/update_partition_spec.cc',
'update/update_properties.cc',
'update/update_schema.cc',
diff --git a/src/iceberg/snapshot.cc b/src/iceberg/snapshot.cc
index ba85e049..dfa9a340 100644
--- a/src/iceberg/snapshot.cc
+++ b/src/iceberg/snapshot.cc
@@ -19,10 +19,13 @@
#include "iceberg/snapshot.h"
+#include <memory>
+
#include "iceberg/file_io.h"
#include "iceberg/manifest/manifest_list.h"
#include "iceberg/manifest/manifest_reader.h"
#include "iceberg/util/macros.h"
+#include "iceberg/util/string_util.h"
namespace iceberg {
@@ -49,6 +52,55 @@ SnapshotRefType SnapshotRef::type() const noexcept {
retention);
}
+Status SnapshotRef::Validate() const {
+ if (type() == SnapshotRefType::kBranch) {
+ const auto& branch = std::get<Branch>(this->retention);
+ ICEBERG_CHECK(!branch.min_snapshots_to_keep.has_value() ||
+ branch.min_snapshots_to_keep.value() > 0,
+ "Min snapshots to keep must be greater than 0");
+ ICEBERG_CHECK(
+ !branch.max_snapshot_age_ms.has_value() ||
branch.max_snapshot_age_ms.value() > 0,
+ "Max snapshot age must be greater than 0 ms");
+ ICEBERG_CHECK(!branch.max_ref_age_ms.has_value() ||
branch.max_ref_age_ms.value() > 0,
+ "Max reference age must be greater than 0");
+ } else {
+ const auto& tag = std::get<Tag>(this->retention);
+ ICEBERG_CHECK(!tag.max_ref_age_ms.has_value() ||
tag.max_ref_age_ms.value() > 0,
+ "Max reference age must be greater than 0");
+ }
+ return {};
+}
+
+Result<std::unique_ptr<SnapshotRef>> SnapshotRef::MakeBranch(
+ int64_t snapshot_id, std::optional<int32_t> min_snapshots_to_keep,
+ std::optional<int64_t> max_snapshot_age_ms, std::optional<int64_t>
max_ref_age_ms) {
+ auto ref = std::make_unique<SnapshotRef>(
+ SnapshotRef{.snapshot_id = snapshot_id,
+ .retention = Branch{
+ .min_snapshots_to_keep = min_snapshots_to_keep,
+ .max_snapshot_age_ms = max_snapshot_age_ms,
+ .max_ref_age_ms = max_ref_age_ms,
+ }});
+ ICEBERG_RETURN_UNEXPECTED(ref->Validate());
+ return ref;
+}
+
+Result<std::unique_ptr<SnapshotRef>> SnapshotRef::MakeTag(
+ int64_t snapshot_id, std::optional<int64_t> max_ref_age_ms) {
+ auto ref = std::make_unique<SnapshotRef>(SnapshotRef{
+ .snapshot_id = snapshot_id, .retention = Tag{.max_ref_age_ms =
max_ref_age_ms}});
+ ICEBERG_RETURN_UNEXPECTED(ref->Validate());
+ return ref;
+}
+
+std::unique_ptr<SnapshotRef> SnapshotRef::Clone(
+ std::optional<int64_t> new_snapshot_id) const {
+ auto ref = std::make_unique<SnapshotRef>();
+ ref->snapshot_id = new_snapshot_id.value_or(snapshot_id);
+ ref->retention = retention;
+ return ref;
+}
+
bool SnapshotRef::Equals(const SnapshotRef& other) const {
if (this == &other) {
return true;
@@ -67,7 +119,7 @@ bool SnapshotRef::Equals(const SnapshotRef& other) const {
}
}
-std::optional<std::string_view> Snapshot::operation() const {
+std::optional<std::string_view> Snapshot::Operation() const {
auto it = summary.find(SnapshotSummaryFields::kOperation);
if (it != summary.end()) {
return it->second;
@@ -75,6 +127,24 @@ std::optional<std::string_view> Snapshot::operation() const
{
return std::nullopt;
}
+Result<std::optional<int64_t>> Snapshot::FirstRowId() const {
+ auto it = summary.find(SnapshotSummaryFields::kFirstRowId);
+ if (it == summary.end()) {
+ return std::nullopt;
+ }
+
+ return StringUtils::ParseInt<int64_t>(it->second);
+}
+
+Result<std::optional<int64_t>> Snapshot::AddedRows() const {
+ auto it = summary.find(SnapshotSummaryFields::kAddedRows);
+ if (it == summary.end()) {
+ return std::nullopt;
+ }
+
+ return StringUtils::ParseInt<int64_t>(it->second);
+}
+
bool Snapshot::Equals(const Snapshot& other) const {
if (this == &other) {
return true;
@@ -85,6 +155,37 @@ bool Snapshot::Equals(const Snapshot& other) const {
schema_id == other.schema_id;
}
+Result<std::unique_ptr<Snapshot>> Snapshot::Make(
+ int64_t sequence_number, int64_t snapshot_id,
+ std::optional<int64_t> parent_snapshot_id, TimePointMs timestamp_ms,
+ std::string operation, std::unordered_map<std::string, std::string>
summary,
+ std::optional<int32_t> schema_id, std::string manifest_list,
+ std::optional<int64_t> first_row_id, std::optional<int64_t> added_rows) {
+ ICEBERG_PRECHECK(!operation.empty(), "Operation cannot be empty");
+ ICEBERG_PRECHECK(!first_row_id.has_value() || first_row_id.value() >= 0,
+ "Invalid first-row-id (cannot be negative): {}",
first_row_id.value());
+ ICEBERG_PRECHECK(!added_rows.has_value() || added_rows.value() >= 0,
+ "Invalid added-rows (cannot be negative): {}",
added_rows.value());
+ ICEBERG_PRECHECK(!first_row_id.has_value() || added_rows.has_value(),
+ "Missing added-rows when first-row-id is set");
+ summary[SnapshotSummaryFields::kOperation] = operation;
+ if (first_row_id.has_value()) {
+ summary[SnapshotSummaryFields::kFirstRowId] =
std::to_string(first_row_id.value());
+ }
+ if (added_rows.has_value()) {
+ summary[SnapshotSummaryFields::kAddedRows] =
std::to_string(added_rows.value());
+ }
+ return std::make_unique<Snapshot>(Snapshot{
+ .snapshot_id = snapshot_id,
+ .parent_snapshot_id = parent_snapshot_id,
+ .sequence_number = sequence_number,
+ .timestamp_ms = timestamp_ms,
+ .manifest_list = std::move(manifest_list),
+ .summary = std::move(summary),
+ .schema_id = schema_id,
+ });
+}
+
Result<SnapshotCache::ManifestsCache> SnapshotCache::InitManifestsCache(
const Snapshot* snapshot, std::shared_ptr<FileIO> file_io) {
if (file_io == nullptr) {
diff --git a/src/iceberg/snapshot.h b/src/iceberg/snapshot.h
index 86a7fe20..e2ec0ccb 100644
--- a/src/iceberg/snapshot.h
+++ b/src/iceberg/snapshot.h
@@ -25,7 +25,6 @@
#include <string>
#include <string_view>
#include <unordered_map>
-#include <utility>
#include <variant>
#include "iceberg/iceberg_export.h"
@@ -114,6 +113,39 @@ struct ICEBERG_EXPORT SnapshotRef {
SnapshotRefType type() const noexcept;
+ /// \brief Create a branch reference
+ ///
+ /// \param snapshot_id The snapshot ID for the branch
+ /// \param min_snapshots_to_keep Optional minimum number of snapshots to keep
+ /// \param max_snapshot_age_ms Optional maximum snapshot age in milliseconds
+ /// \param max_ref_age_ms Optional maximum reference age in milliseconds
+ /// \return A Result containing a unique_ptr to the SnapshotRef, or an error
if
+ /// validation failed
+ static Result<std::unique_ptr<SnapshotRef>> MakeBranch(
+ int64_t snapshot_id, std::optional<int32_t> min_snapshots_to_keep =
std::nullopt,
+ std::optional<int64_t> max_snapshot_age_ms = std::nullopt,
+ std::optional<int64_t> max_ref_age_ms = std::nullopt);
+
+ /// \brief Create a tag reference
+ ///
+ /// \param snapshot_id The snapshot ID for the tag
+ /// \param max_ref_age_ms Optional maximum reference age in milliseconds
+ /// \return A Result containing a unique_ptr to the SnapshotRef, or an error
if
+ /// validation failed
+ static Result<std::unique_ptr<SnapshotRef>> MakeTag(
+ int64_t snapshot_id, std::optional<int64_t> max_ref_age_ms =
std::nullopt);
+
+ /// \brief Clone this SnapshotRef with an optional new snapshot ID
+ ///
+ /// \param new_snapshot_id Optional new snapshot ID. If not provided, uses
the current
+ /// snapshot_id
+ /// \return A unique_ptr to the cloned SnapshotRef
+ std::unique_ptr<SnapshotRef> Clone(
+ std::optional<int64_t> new_snapshot_id = std::nullopt) const;
+
+ /// \brief Validate the SnapshotRef
+ Status Validate() const;
+
/// \brief Compare two snapshot refs for equality
friend bool operator==(const SnapshotRef& lhs, const SnapshotRef& rhs) {
return lhs.Equals(rhs);
@@ -125,9 +157,13 @@ struct ICEBERG_EXPORT SnapshotRef {
};
/// \brief Optional Snapshot Summary Fields
-struct SnapshotSummaryFields {
+struct ICEBERG_EXPORT SnapshotSummaryFields {
/// \brief The operation field key
inline static const std::string kOperation = "operation";
+ /// \brief The first row id field key
+ inline static const std::string kFirstRowId = "first-row-id";
+ /// \brief The added rows field key
+ inline static const std::string kAddedRows = "added-rows";
/// Metrics, see https://iceberg.apache.org/spec/#metrics
@@ -246,12 +282,44 @@ struct ICEBERG_EXPORT Snapshot {
/// ID of the table's current schema when the snapshot was created.
std::optional<int32_t> schema_id;
+ /// \brief Create a new Snapshot instance with validation on the inputs.
+ static Result<std::unique_ptr<Snapshot>> Make(
+ int64_t sequence_number, int64_t snapshot_id,
+ std::optional<int64_t> parent_snapshot_id, TimePointMs timestamp_ms,
+ std::string operation, std::unordered_map<std::string, std::string>
summary,
+ std::optional<int32_t> schema_id, std::string manifest_list,
+ std::optional<int64_t> first_row_id = std::nullopt,
+ std::optional<int64_t> added_rows = std::nullopt);
+
/// \brief Return the name of the DataOperations data operation that
produced this
/// snapshot.
///
/// \return the operation that produced this snapshot, or nullopt if the
operation is
/// unknown.
- std::optional<std::string_view> operation() const;
+ std::optional<std::string_view> Operation() const;
+
+ /// \brief The row-id of the first newly added row in this snapshot.
+ ///
+ /// All rows added in this snapshot will have a row-id assigned to them
greater than
+ /// this value. All rows with a row-id less than this value were created in
a snapshot
+ /// that was added to the table (but not necessarily committed to this
branch) in the
+ /// past.
+ ///
+ /// \return the first row-id to be used in this snapshot or nullopt when row
lineage
+ /// is not supported
+ Result<std::optional<int64_t>> FirstRowId() const;
+
+ /// \brief The upper bound of number of rows with assigned row IDs in this
snapshot.
+ ///
+ /// It can be used safely to increment the table's `next-row-id` during a
commit. It
+ /// can be more than the number of rows added in this snapshot and include
some
+ /// existing rows.
+ ///
+ /// This field is optional but is required when the table version supports
row lineage.
+ ///
+ /// \return the upper bound of number of rows with assigned row IDs in this
snapshot
+ /// or nullopt if the value was not stored.
+ Result<std::optional<int64_t>> AddedRows() const;
/// \brief Compare two snapshots for equality.
friend bool operator==(const Snapshot& lhs, const Snapshot& rhs) {
diff --git a/src/iceberg/table.h b/src/iceberg/table.h
index 31139585..77fe763f 100644
--- a/src/iceberg/table.h
+++ b/src/iceberg/table.h
@@ -50,7 +50,7 @@ class ICEBERG_EXPORT Table : public
std::enable_shared_from_this<Table> {
virtual ~Table();
- /// \brief Return the identifier of this table
+ /// \brief Returns the identifier of this table
const TableIdentifier& name() const { return identifier_; }
/// \brief Returns the UUID of the table
@@ -59,40 +59,40 @@ class ICEBERG_EXPORT Table : public
std::enable_shared_from_this<Table> {
/// \brief Return the schema for this table, return NotFoundError if not
found
Result<std::shared_ptr<Schema>> schema() const;
- /// \brief Return a map of schema for this table
+ /// \brief Returns a map of schema for this table
Result<
std::reference_wrapper<const std::unordered_map<int32_t,
std::shared_ptr<Schema>>>>
schemas() const;
- /// \brief Return the partition spec for this table, return NotFoundError if
not found
+ /// \brief Returns the partition spec for this table, return NotFoundError
if not found
Result<std::shared_ptr<PartitionSpec>> spec() const;
- /// \brief Return a map of partition specs for this table
+ /// \brief Returns a map of partition specs for this table
Result<std::reference_wrapper<
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>>>
specs() const;
- /// \brief Return the sort order for this table, return NotFoundError if not
found
+ /// \brief Returns the sort order for this table, return NotFoundError if
not found
Result<std::shared_ptr<SortOrder>> sort_order() const;
- /// \brief Return a map of sort order IDs to sort orders for this table
+ /// \brief Returns a map of sort order IDs to sort orders for this table
Result<std::reference_wrapper<
const std::unordered_map<int32_t, std::shared_ptr<SortOrder>>>>
sort_orders() const;
- /// \brief Return a map of string properties for this table
+ /// \brief Returns the properties of this table
const TableProperties& properties() const;
- /// \brief Return the table's metadata file location
+ /// \brief Returns the table's metadata file location
std::string_view metadata_file_location() const;
- /// \brief Return the table's base location
+ /// \brief Returns the table's base location
std::string_view location() const;
/// \brief Returns the time when this table was last updated
TimePointMs last_updated_ms() const;
- /// \brief Return the table's current snapshot, return NotFoundError if not
found
+ /// \brief Returns the table's current snapshot, return NotFoundError if not
found
Result<std::shared_ptr<Snapshot>> current_snapshot() const;
/// \brief Get the snapshot of this table with the given id
diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc
index 851048b3..22eb739b 100644
--- a/src/iceberg/table_metadata.cc
+++ b/src/iceberg/table_metadata.cc
@@ -34,6 +34,7 @@
#include <nlohmann/json.hpp>
+#include "iceberg/constants.h"
#include "iceberg/exception.h"
#include "iceberg/file_io.h"
#include "iceberg/json_internal.h"
@@ -52,6 +53,7 @@
#include "iceberg/util/location_util.h"
#include "iceberg/util/macros.h"
#include "iceberg/util/property_util.h"
+#include "iceberg/util/timepoint.h"
#include "iceberg/util/type_util.h"
#include "iceberg/util/uuid.h"
@@ -244,26 +246,38 @@ Result<std::shared_ptr<Schema>>
TableMetadata::SchemaById(int32_t schema_id) con
}
Result<std::shared_ptr<PartitionSpec>> TableMetadata::PartitionSpec() const {
- auto iter = std::ranges::find_if(partition_specs, [this](const auto& spec) {
- return spec != nullptr && spec->spec_id() == default_spec_id;
+ return PartitionSpecById(default_spec_id);
+}
+
+Result<std::shared_ptr<PartitionSpec>> TableMetadata::PartitionSpecById(
+ int32_t spec_id) const {
+ auto iter = std::ranges::find_if(partition_specs, [spec_id](const auto&
spec) {
+ return spec != nullptr && spec->spec_id() == spec_id;
});
if (iter == partition_specs.end()) {
- return NotFound("Default partition spec is not found");
+ return NotFound("Partition spec with ID {} is not found", spec_id);
}
return *iter;
}
Result<std::shared_ptr<SortOrder>> TableMetadata::SortOrder() const {
- auto iter = std::ranges::find_if(sort_orders, [this](const auto& order) {
- return order != nullptr && order->order_id() == default_sort_order_id;
+ return SortOrderById(default_sort_order_id);
+}
+
+Result<std::shared_ptr<SortOrder>> TableMetadata::SortOrderById(int32_t
order_id) const {
+ auto iter = std::ranges::find_if(sort_orders, [order_id](const auto& order) {
+ return order != nullptr && order->order_id() == order_id;
});
if (iter == sort_orders.end()) {
- return NotFound("Default sort order is not found");
+ return NotFound("Sort order with ID {} is not found", order_id);
}
return *iter;
}
Result<std::shared_ptr<Snapshot>> TableMetadata::Snapshot() const {
+ if (current_snapshot_id == kInvalidSnapshotId) {
+ return NotFound("No current snapshot");
+ }
return SnapshotById(current_snapshot_id);
}
@@ -277,6 +291,10 @@ Result<std::shared_ptr<Snapshot>>
TableMetadata::SnapshotById(int64_t snapshot_i
return *iter;
}
+int64_t TableMetadata::NextSequenceNumber() const {
+ return format_version > 1 ? last_sequence_number + 1 :
kInitialSequenceNumber;
+}
+
namespace {
template <typename T>
@@ -555,6 +573,10 @@ class TableMetadataBuilder::Impl {
sort_orders_by_id_.emplace(order->order_id(), order);
}
+ for (const auto& snapshot : metadata_.snapshots) {
+ snapshots_by_id_.emplace(snapshot->snapshot_id, snapshot);
+ }
+
metadata_.last_updated_ms = kInvalidLastUpdatedMs;
}
@@ -591,6 +613,10 @@ class TableMetadataBuilder::Impl {
Status RemoveSchemas(const std::unordered_set<int32_t>& schema_ids);
Result<int32_t> AddSchema(const Schema& schema, int32_t new_last_column_id);
void SetLocation(std::string_view location);
+ Status AddSnapshot(std::shared_ptr<Snapshot> snapshot);
+ Status SetBranchSnapshot(int64_t snapshot_id, const std::string& branch);
+ Status SetBranchSnapshot(std::shared_ptr<Snapshot> snapshot, const
std::string& branch);
+ Status SetRef(const std::string& name, std::shared_ptr<SnapshotRef> ref);
Result<std::unique_ptr<TableMetadata>> Build();
@@ -613,6 +639,34 @@ class TableMetadataBuilder::Impl {
/// \return The ID to use for this schema (reused if exists, new otherwise
int32_t ReuseOrCreateNewSchemaId(const Schema& new_schema) const;
+ /// \brief Finds intermediate snapshots that have not been committed as the
current
+ /// snapshot.
+ ///
+ /// Transactions can create snapshots that are never the current snapshot
because
+ /// several changes are combined by the transaction into one table metadata
update. When
+ /// each intermediate snapshot is added to table metadata, it is added to
the snapshot
+ /// log, assuming that it will be the current snapshot. When there are
multiple snapshot
+ /// updates, the log must be corrected by suppressing the intermediate
snapshot entries.
+ ///
+ /// A snapshot is an intermediate snapshot if it was added but is not the
current
+ /// snapshot.
+ ///
+ /// \param current_snapshot_id The current snapshot ID
+ /// \return A set of snapshot IDs for all added snapshots that were later
replaced as
+ /// the current snapshot in changes
+ std::unordered_set<int64_t> IntermediateSnapshotIdSet(
+ int64_t current_snapshot_id) const;
+
+ /// \brief Updates the snapshot log by removing intermediate snapshots and
handling
+ /// removed snapshots.
+ ///
+ /// \param current_snapshot_id The current snapshot ID
+ /// \return Updated snapshot log or error
+ Result<std::vector<SnapshotLogEntry>> UpdateSnapshotLog(
+ int64_t current_snapshot_id) const;
+
+ Status SetBranchSnapshotInternal(const Snapshot& snapshot, const
std::string& branch);
+
private:
// Base metadata (nullptr for new tables)
const TableMetadata* base_;
@@ -634,6 +688,7 @@ class TableMetadataBuilder::Impl {
std::unordered_map<int32_t, std::shared_ptr<Schema>> schemas_by_id_;
std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id_;
std::unordered_map<int32_t, std::shared_ptr<SortOrder>> sort_orders_by_id_;
+ std::unordered_map<int64_t, std::shared_ptr<Snapshot>> snapshots_by_id_;
};
Status TableMetadataBuilder::Impl::AssignUUID(std::string_view uuid) {
@@ -982,6 +1037,206 @@ void
TableMetadataBuilder::Impl::SetLocation(std::string_view location) {
changes_.push_back(std::make_unique<table::SetLocation>(std::string(location)));
}
+Status TableMetadataBuilder::Impl::AddSnapshot(std::shared_ptr<Snapshot>
snapshot) {
+ if (snapshot == nullptr) {
+ // change is a noop
+ return {};
+ }
+ ICEBERG_CHECK(!metadata_.schemas.empty(),
+ "Attempting to add a snapshot before a schema is added");
+ ICEBERG_CHECK(!metadata_.partition_specs.empty(),
+ "Attempting to add a snapshot before a partition spec is
added");
+ ICEBERG_CHECK(!metadata_.sort_orders.empty(),
+ "Attempting to add a snapshot before a sort order is added");
+ ICEBERG_CHECK(!snapshots_by_id_.contains(snapshot->snapshot_id),
+ "Snapshot already exists for id: {}", snapshot->snapshot_id);
+ ICEBERG_CHECK(
+ metadata_.format_version == 1 ||
+ snapshot->sequence_number > metadata_.last_sequence_number ||
+ !snapshot->parent_snapshot_id.has_value(),
+ "Cannot add snapshot with sequence number {} older than last sequence
number {}",
+ snapshot->sequence_number, metadata_.last_sequence_number);
+
+ metadata_.last_updated_ms = snapshot->timestamp_ms;
+ metadata_.last_sequence_number = snapshot->sequence_number;
+ metadata_.snapshots.push_back(snapshot);
+ snapshots_by_id_.emplace(snapshot->snapshot_id, snapshot);
+ changes_.push_back(std::make_unique<table::AddSnapshot>(snapshot));
+
+ if (metadata_.format_version >= TableMetadata::kMinFormatVersionRowLineage) {
+ ICEBERG_ASSIGN_OR_RAISE(auto first_row_id, snapshot->FirstRowId());
+ ICEBERG_CHECK(first_row_id.has_value(),
+ "Cannot add a snapshot: first-row-id is null");
+ ICEBERG_CHECK(
+ first_row_id.value() >= metadata_.next_row_id,
+ "Cannot add a snapshot, first-row-id is behind table next-row-id: {} <
{}",
+ first_row_id.value(), metadata_.next_row_id);
+
+ ICEBERG_ASSIGN_OR_RAISE(auto add_rows, snapshot->AddedRows());
+ ICEBERG_CHECK(add_rows.has_value(), "Cannot add a snapshot: added-rows is
null");
+ metadata_.next_row_id += add_rows.value();
+ }
+
+ return {};
+}
+
+Status TableMetadataBuilder::Impl::SetBranchSnapshot(int64_t snapshot_id,
+ const std::string&
branch) {
+ auto ref_it = metadata_.refs.find(branch);
+ if (ref_it != metadata_.refs.end() && ref_it->second->snapshot_id ==
snapshot_id) {
+ // change is a noop
+ return {};
+ }
+
+ auto snapshot_it = snapshots_by_id_.find(snapshot_id);
+ ICEBERG_CHECK(snapshot_it != snapshots_by_id_.end(),
+ "Cannot set {} to unknown snapshot: {}", branch, snapshot_id);
+ return SetBranchSnapshotInternal(*snapshot_it->second, branch);
+}
+
+Status TableMetadataBuilder::Impl::SetBranchSnapshot(std::shared_ptr<Snapshot>
snapshot,
+ const std::string&
branch) {
+ if (snapshot == nullptr) {
+ // change is a noop
+ return {};
+ }
+ const Snapshot& snapshot_ref = *snapshot;
+ ICEBERG_RETURN_UNEXPECTED(AddSnapshot(std::move(snapshot)));
+ return SetBranchSnapshotInternal(snapshot_ref, branch);
+}
+
+Status TableMetadataBuilder::Impl::SetBranchSnapshotInternal(const Snapshot&
snapshot,
+ const
std::string& branch) {
+ const int64_t replacement_snapshot_id = snapshot.snapshot_id;
+ auto ref_it = metadata_.refs.find(branch);
+ if (ref_it != metadata_.refs.end()) {
+ ICEBERG_CHECK(ref_it->second->type() == SnapshotRefType::kBranch,
+ "Cannot update branch: {} is a tag", branch);
+ if (ref_it->second->snapshot_id == replacement_snapshot_id) {
+ return {};
+ }
+ }
+
+ ICEBERG_CHECK(
+ metadata_.format_version == 1 ||
+ snapshot.sequence_number <= metadata_.last_sequence_number,
+ "Last sequence number {} is less than existing snapshot sequence number
{}",
+ metadata_.last_sequence_number, snapshot.sequence_number);
+
+ std::shared_ptr<SnapshotRef> new_ref;
+ if (ref_it != metadata_.refs.end()) {
+ new_ref = ref_it->second->Clone(replacement_snapshot_id);
+ } else {
+ ICEBERG_ASSIGN_OR_RAISE(new_ref,
SnapshotRef::MakeBranch(replacement_snapshot_id));
+ }
+
+ return SetRef(branch, std::move(new_ref));
+}
+
+Status TableMetadataBuilder::Impl::SetRef(const std::string& name,
+ std::shared_ptr<SnapshotRef> ref) {
+ auto existing_ref_it = metadata_.refs.find(name);
+ if (existing_ref_it != metadata_.refs.end() && *existing_ref_it->second ==
*ref) {
+ return {};
+ }
+
+ int64_t snapshot_id = ref->snapshot_id;
+ auto snapshot_it = snapshots_by_id_.find(snapshot_id);
+ ICEBERG_CHECK(snapshot_it != snapshots_by_id_.end(),
+ "Cannot set {} to unknown snapshot: {}", name, snapshot_id);
+ const auto& snapshot = snapshot_it->second;
+
+ // If snapshot was added in this set of changes, update last_updated_ms
+ if (std::ranges::any_of(changes_, [snapshot_id](const auto& change) {
+ return change->kind() == TableUpdate::Kind::kAddSnapshot &&
+ internal::checked_cast<const table::AddSnapshot&>(*change)
+ .snapshot()
+ ->snapshot_id == snapshot_id;
+ })) {
+ metadata_.last_updated_ms = snapshot->timestamp_ms;
+ }
+
+ if (name == SnapshotRef::kMainBranch) {
+ metadata_.current_snapshot_id = ref->snapshot_id;
+ if (metadata_.last_updated_ms == kInvalidLastUpdatedMs) {
+ metadata_.last_updated_ms = CurrentTimePointMs();
+ }
+ metadata_.snapshot_log.emplace_back(metadata_.last_updated_ms,
ref->snapshot_id);
+ }
+
+ changes_.push_back(std::make_unique<table::SetSnapshotRef>(name, *ref));
+ metadata_.refs[name] = std::move(ref);
+
+ return {};
+}
+
+std::unordered_set<int64_t>
TableMetadataBuilder::Impl::IntermediateSnapshotIdSet(
+ int64_t current_snapshot_id) const {
+ std::unordered_set<int64_t> added_snapshot_ids;
+ std::unordered_set<int64_t> intermediate_snapshot_ids;
+
+ std::ranges::for_each(changes_, [&](const auto& change) {
+ if (change->kind() == TableUpdate::Kind::kAddSnapshot) {
+ // Adds must always come before set current snapshot
+ const auto& added_snapshot =
+ internal::checked_cast<const table::AddSnapshot&>(*change);
+ added_snapshot_ids.insert(added_snapshot.snapshot()->snapshot_id);
+ } else if (change->kind() == TableUpdate::Kind::kSetSnapshotRef) {
+ const auto& set_ref = internal::checked_cast<const
table::SetSnapshotRef&>(*change);
+ int64_t snapshot_id = set_ref.snapshot_id();
+ if (added_snapshot_ids.contains(snapshot_id) &&
+ set_ref.ref_name() == SnapshotRef::kMainBranch &&
+ snapshot_id != current_snapshot_id) {
+ intermediate_snapshot_ids.insert(snapshot_id);
+ }
+ }
+ });
+
+ return intermediate_snapshot_ids;
+}
+
+Result<std::vector<SnapshotLogEntry>>
TableMetadataBuilder::Impl::UpdateSnapshotLog(
+ int64_t current_snapshot_id) const {
+ std::unordered_set<int64_t> intermediate_snapshot_ids =
+ IntermediateSnapshotIdSet(current_snapshot_id);
+ const bool has_removed_snapshots =
+ std::ranges::any_of(changes_, [](const auto& change) {
+ return change->kind() == TableUpdate::Kind::kRemoveSnapshots;
+ });
+ if (intermediate_snapshot_ids.empty() && !has_removed_snapshots) {
+ return metadata_.snapshot_log;
+ }
+
+ // Update the snapshot log
+ std::vector<SnapshotLogEntry> new_snapshot_log;
+ for (const auto& log_entry : metadata_.snapshot_log) {
+ int64_t snapshot_id = log_entry.snapshot_id;
+ if (snapshots_by_id_.contains(snapshot_id)) {
+ if (!intermediate_snapshot_ids.contains(snapshot_id)) {
+ // Copy the log entries that are still valid
+ new_snapshot_log.push_back(log_entry);
+ }
+ } else if (has_removed_snapshots) {
+ // Any invalid entry causes the history before it to be removed.
Otherwise, there
+ // could be history gaps that cause time-travel queries to produce
incorrect
+ // results. For example, if history is [(t1, s1), (t2, s2), (t3, s3)]
and s2 is
+ // removed, the history cannot be [(t1, s1), (t3, s3)] because it
appears that s3
+ // was current during the time between t2 and t3 when in fact s2 was the
current
+ // snapshot.
+ new_snapshot_log.clear();
+ }
+ }
+
+ if (snapshots_by_id_.contains(current_snapshot_id)) {
+ ICEBERG_CHECK(
+ !new_snapshot_log.empty() &&
+ new_snapshot_log.back().snapshot_id == current_snapshot_id,
+ "Cannot set invalid snapshot log: latest entry is not the current
snapshot");
+ }
+
+ return new_snapshot_log;
+}
+
Result<std::unique_ptr<TableMetadata>> TableMetadataBuilder::Impl::Build() {
// 1. Validate metadata consistency through TableMetadata#Validate
@@ -1025,7 +1280,9 @@ Result<std::unique_ptr<TableMetadata>>
TableMetadataBuilder::Impl::Build() {
metadata_.metadata_log.end() -
max_metadata_log_size);
}
- // TODO(anyone): 4. update snapshot_log
+ // 4. Update snapshot_log
+ ICEBERG_ASSIGN_OR_RAISE(metadata_.snapshot_log,
+ UpdateSnapshotLog(metadata_.current_snapshot_id));
// 5. Create and return the TableMetadata
return std::make_unique<TableMetadata>(std::move(metadata_));
@@ -1207,17 +1464,26 @@ TableMetadataBuilder&
TableMetadataBuilder::AddSortOrder(
TableMetadataBuilder& TableMetadataBuilder::AddSnapshot(
std::shared_ptr<Snapshot> snapshot) {
- throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+ ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->AddSnapshot(snapshot));
+ return *this;
}
TableMetadataBuilder& TableMetadataBuilder::SetBranchSnapshot(int64_t
snapshot_id,
const
std::string& branch) {
- throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+ ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetBranchSnapshot(snapshot_id,
branch));
+ return *this;
+}
+
+TableMetadataBuilder& TableMetadataBuilder::SetBranchSnapshot(
+ std::shared_ptr<Snapshot> snapshot, const std::string& branch) {
+
ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetBranchSnapshot(std::move(snapshot),
branch));
+ return *this;
}
TableMetadataBuilder& TableMetadataBuilder::SetRef(const std::string& name,
std::shared_ptr<SnapshotRef> ref) {
- throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+ ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetRef(name, std::move(ref)));
+ return *this;
}
TableMetadataBuilder& TableMetadataBuilder::RemoveRef(const std::string& name)
{
diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h
index 3e3eb9c7..a4165b81 100644
--- a/src/iceberg/table_metadata.h
+++ b/src/iceberg/table_metadata.h
@@ -134,17 +134,32 @@ struct ICEBERG_EXPORT TableMetadata {
int format_version = kDefaultTableFormatVersion);
/// \brief Get the current schema, return NotFoundError if not found
+ /// \note The returned schema is guaranteed to be not null
Result<std::shared_ptr<iceberg::Schema>> Schema() const;
/// \brief Get the current schema by ID, return NotFoundError if not found
+ /// \note The returned schema is guaranteed to be not null
Result<std::shared_ptr<iceberg::Schema>> SchemaById(int32_t schema_id) const;
/// \brief Get the current partition spec, return NotFoundError if not found
+ /// \note The returned partition spec is guaranteed to be not null
Result<std::shared_ptr<iceberg::PartitionSpec>> PartitionSpec() const;
+ /// \brief Get the current partition spec by ID, return NotFoundError if not
found
+ /// \note The returned partition spec is guaranteed to be not null
+ Result<std::shared_ptr<iceberg::PartitionSpec>> PartitionSpecById(
+ int32_t spec_id) const;
/// \brief Get the current sort order, return NotFoundError if not found
+ /// \note The returned sort order is guaranteed to be not null
Result<std::shared_ptr<iceberg::SortOrder>> SortOrder() const;
+ /// \brief Get the current sort order by ID, return NotFoundError if not
found
+ /// \note The returned sort order is guaranteed to be not null
+ Result<std::shared_ptr<iceberg::SortOrder>> SortOrderById(int32_t
sort_order_id) const;
/// \brief Get the current snapshot, return NotFoundError if not found
+ /// \note The returned snapshot is guaranteed to be not null
Result<std::shared_ptr<iceberg::Snapshot>> Snapshot() const;
- /// \brief Get the snapshot of this table with the given id
+ /// \brief Get the snapshot by ID, return NotFoundError if not found
+ /// \note The returned snapshot is guaranteed to be not null
Result<std::shared_ptr<iceberg::Snapshot>> SnapshotById(int64_t snapshot_id)
const;
+ /// \brief Get the next sequence number
+ int64_t NextSequenceNumber() const;
ICEBERG_EXPORT friend bool operator==(const TableMetadata& lhs,
const TableMetadata& rhs);
@@ -337,6 +352,14 @@ class ICEBERG_EXPORT TableMetadataBuilder : public
ErrorCollector {
/// \return Reference to this builder for method chaining
TableMetadataBuilder& SetBranchSnapshot(int64_t snapshot_id, const
std::string& branch);
+ /// \brief Set a branch to point to a specific snapshot
+ ///
+ /// \param snapshot The snapshot the branch should reference
+ /// \param branch The name of the branch
+ /// \return Reference to this builder for method chaining
+ TableMetadataBuilder& SetBranchSnapshot(std::shared_ptr<Snapshot> snapshot,
+ const std::string& branch);
+
/// \brief Set a snapshot reference
///
/// \param name The name of the reference
diff --git a/src/iceberg/table_properties.h b/src/iceberg/table_properties.h
index feb4a200..5d5c17db 100644
--- a/src/iceberg/table_properties.h
+++ b/src/iceberg/table_properties.h
@@ -20,7 +20,6 @@
#pragma once
#include <limits>
-#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
@@ -244,6 +243,9 @@ class ICEBERG_EXPORT TableProperties : public
ConfigBase<TableProperties> {
inline static Entry<int64_t> kDeleteTargetFileSizeBytes{
"write.delete.target-file-size-bytes", int64_t{64} * 1024 * 1024}; //
64 MB
+ inline static Entry<bool> kSnapshotIdInheritanceEnabled{
+ "compatibility.snapshot-id-inheritance.enabled", false};
+
// Garbage collection properties
inline static Entry<bool> kGcEnabled{"gc.enabled", true};
diff --git a/src/iceberg/table_update.cc b/src/iceberg/table_update.cc
index 38ce0fbc..29388d47 100644
--- a/src/iceberg/table_update.cc
+++ b/src/iceberg/table_update.cc
@@ -274,7 +274,7 @@ std::unique_ptr<TableUpdate> SetDefaultSortOrder::Clone()
const {
// AddSnapshot
void AddSnapshot::ApplyTo(TableMetadataBuilder& builder) const {
- throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+ builder.AddSnapshot(snapshot_);
}
void AddSnapshot::GenerateRequirements(TableUpdateContext& context) const {
@@ -344,7 +344,7 @@ std::unique_ptr<TableUpdate> RemoveSnapshotRef::Clone()
const {
// SetSnapshotRef
void SetSnapshotRef::ApplyTo(TableMetadataBuilder& builder) const {
- throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+ builder.SetBranchSnapshot(snapshot_id_, ref_name_);
}
void SetSnapshotRef::GenerateRequirements(TableUpdateContext& context) const {
diff --git a/src/iceberg/table_update.h b/src/iceberg/table_update.h
index 87524319..3c9c9dbb 100644
--- a/src/iceberg/table_update.h
+++ b/src/iceberg/table_update.h
@@ -401,6 +401,19 @@ class ICEBERG_EXPORT SetSnapshotRef : public TableUpdate {
max_snapshot_age_ms_(max_snapshot_age_ms),
max_ref_age_ms_(max_ref_age_ms) {}
+ SetSnapshotRef(std::string ref_name, const SnapshotRef& ref)
+ : ref_name_(std::move(ref_name)), snapshot_id_(ref.snapshot_id),
type_(ref.type()) {
+ if (type_ == SnapshotRefType::kBranch) {
+ const auto& branch = std::get<SnapshotRef::Branch>(ref.retention);
+ min_snapshots_to_keep_ = branch.min_snapshots_to_keep;
+ max_snapshot_age_ms_ = branch.max_snapshot_age_ms;
+ max_ref_age_ms_ = branch.max_ref_age_ms;
+ } else {
+ const auto& tag = std::get<SnapshotRef::Tag>(ref.retention);
+ max_ref_age_ms_ = tag.max_ref_age_ms;
+ }
+ }
+
const std::string& ref_name() const { return ref_name_; }
int64_t snapshot_id() const { return snapshot_id_; }
SnapshotRefType type() const { return type_; }
diff --git a/src/iceberg/test/json_internal_test.cc
b/src/iceberg/test/json_internal_test.cc
index f88527ff..33ed03b9 100644
--- a/src/iceberg/test/json_internal_test.cc
+++ b/src/iceberg/test/json_internal_test.cc
@@ -269,7 +269,7 @@ TEST(JsonInternalTest,
SnapshotFromJsonSummaryWithNoOperation) {
auto result = SnapshotFromJson(snapshot_json);
ASSERT_TRUE(result.has_value());
- ASSERT_EQ(result.value()->operation(), DataOperation::kOverwrite);
+ ASSERT_EQ(result.value()->Operation(), DataOperation::kOverwrite);
}
TEST(JsonInternalTest, NameMapping) {
diff --git a/src/iceberg/test/snapshot_test.cc
b/src/iceberg/test/snapshot_test.cc
index a3c28f89..e7a657fb 100644
--- a/src/iceberg/test/snapshot_test.cc
+++ b/src/iceberg/test/snapshot_test.cc
@@ -96,7 +96,7 @@ TEST_F(SnapshotTest, ConstructionAndFieldAccess) {
EXPECT_EQ(snapshot.sequence_number, 1);
EXPECT_EQ(snapshot.timestamp_ms.time_since_epoch().count(), 1615569200000);
EXPECT_EQ(snapshot.manifest_list, "s3://example/manifest_list.avro");
- EXPECT_EQ(snapshot.operation().value(), DataOperation::kAppend);
+ EXPECT_EQ(snapshot.Operation(), std::make_optional(DataOperation::kAppend));
EXPECT_EQ(snapshot.summary.at(std::string(SnapshotSummaryFields::kAddedDataFiles)),
"101");
EXPECT_EQ(snapshot.summary.at(std::string(SnapshotSummaryFields::kOperation)),
diff --git a/src/iceberg/test/table_requirements_test.cc
b/src/iceberg/test/table_requirements_test.cc
index 80f83636..dd0aa8f6 100644
--- a/src/iceberg/test/table_requirements_test.cc
+++ b/src/iceberg/test/table_requirements_test.cc
@@ -883,12 +883,12 @@ TEST(TableRequirementsTest, SetSnapshotRef) {
// Multiple updates to same ref should deduplicate
std::vector<std::unique_ptr<TableUpdate>> updates;
- updates.push_back(std::make_unique<table::SetSnapshotRef>(kRefName,
kSnapshotId,
-
SnapshotRefType::kBranch));
- updates.push_back(std::make_unique<table::SetSnapshotRef>(kRefName,
kSnapshotId + 1,
-
SnapshotRefType::kBranch));
- updates.push_back(std::make_unique<table::SetSnapshotRef>(kRefName,
kSnapshotId + 2,
-
SnapshotRefType::kBranch));
+ ICEBERG_UNWRAP_OR_FAIL(auto ref1, SnapshotRef::MakeBranch(kSnapshotId));
+ updates.push_back(std::make_unique<table::SetSnapshotRef>(kRefName, *ref1));
+ ICEBERG_UNWRAP_OR_FAIL(auto ref2, SnapshotRef::MakeBranch(kSnapshotId + 1));
+ updates.push_back(std::make_unique<table::SetSnapshotRef>(kRefName, *ref2));
+ ICEBERG_UNWRAP_OR_FAIL(auto ref3, SnapshotRef::MakeBranch(kSnapshotId + 2));
+ updates.push_back(std::make_unique<table::SetSnapshotRef>(kRefName, *ref3));
auto result = TableRequirements::ForUpdateTable(*metadata, updates);
ASSERT_THAT(result, IsOk());
diff --git a/src/iceberg/test/update_properties_test.cc
b/src/iceberg/test/update_properties_test.cc
index 8ac8e5eb..59fa1d8d 100644
--- a/src/iceberg/test/update_properties_test.cc
+++ b/src/iceberg/test/update_properties_test.cc
@@ -107,7 +107,7 @@ TEST_F(UpdatePropertiesTest,
UpgradeFormatVersionInvalidString) {
auto result = update->Apply();
EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument));
- EXPECT_THAT(result, HasErrorMessage("Invalid format version"));
+ EXPECT_THAT(result, HasErrorMessage("Failed to parse integer from string"));
}
TEST_F(UpdatePropertiesTest, UpgradeFormatVersionOutOfRange) {
diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc
index 6641a1af..6ef942db 100644
--- a/src/iceberg/transaction.cc
+++ b/src/iceberg/transaction.cc
@@ -20,20 +20,24 @@
#include "iceberg/transaction.h"
#include <memory>
+#include <optional>
#include "iceberg/catalog.h"
#include "iceberg/schema.h"
#include "iceberg/table.h"
#include "iceberg/table_metadata.h"
+#include "iceberg/table_properties.h"
#include "iceberg/table_requirement.h"
#include "iceberg/table_requirements.h"
#include "iceberg/table_update.h"
#include "iceberg/update/pending_update.h"
+#include "iceberg/update/snapshot_update.h"
#include "iceberg/update/update_partition_spec.h"
#include "iceberg/update/update_properties.h"
#include "iceberg/update/update_schema.h"
#include "iceberg/update/update_sort_order.h"
#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/location_util.h"
#include "iceberg/util/macros.h"
namespace iceberg {
@@ -69,6 +73,16 @@ const TableMetadata* Transaction::base() const { return
metadata_builder_->base(
const TableMetadata& Transaction::current() const { return
metadata_builder_->current(); }
+std::string Transaction::MetadataFileLocation(std::string_view filename) const
{
+ const auto metadata_location =
+ current().properties.Get(TableProperties::kWriteMetadataLocation);
+ if (metadata_location.empty()) {
+ return std::format("{}/{}",
LocationUtil::StripTrailingSlash(metadata_location),
+ filename);
+ }
+ return std::format("{}/metadata/{}", current().location, filename);
+}
+
Status Transaction::AddUpdate(const std::shared_ptr<PendingUpdate>& update) {
if (!last_update_committed_) {
return InvalidArgument("Cannot add update when previous update is not
committed");
@@ -113,6 +127,42 @@ Status Transaction::Apply(PendingUpdate& update) {
metadata_builder_->SetCurrentSchema(std::move(result.schema),
result.new_last_column_id);
} break;
+ case PendingUpdate::Kind::kUpdateSnapshot: {
+ const auto& base = metadata_builder_->current();
+
+ auto& update_snapshot = internal::checked_cast<SnapshotUpdate&>(update);
+ ICEBERG_ASSIGN_OR_RAISE(auto result, update_snapshot.Apply());
+
+ // Create a temp builder to check if this is an empty update
+ auto temp_update = TableMetadataBuilder::BuildFrom(&base);
+ if (base.SnapshotById(result.snapshot->snapshot_id).has_value()) {
+ // This is a rollback operation
+ temp_update->SetBranchSnapshot(result.snapshot->snapshot_id,
+ result.target_branch);
+ } else if (result.stage_only) {
+ temp_update->AddSnapshot(result.snapshot);
+ } else {
+ temp_update->SetBranchSnapshot(std::move(result.snapshot),
result.target_branch);
+ }
+
+ if (temp_update->changes().empty()) {
+ // Do not commit if the metadata has not changed. for example, this
may happen
+ // when setting the current snapshot to an ID that is already current.
note that
+ // this check uses identity.
+ return {};
+ }
+
+ for (const auto& change : temp_update->changes()) {
+ change->ApplyTo(*metadata_builder_);
+ }
+
+ // If the table UUID is missing, add it here. the UUID will be
re-created each time
+ // this operation retries to ensure that if a concurrent operation
assigns the UUID,
+ // this operation will not fail.
+ if (base.table_uuid.empty()) {
+ metadata_builder_->AssignUUID();
+ }
+ } break;
default:
return NotSupported("Unsupported pending update: {}",
static_cast<int32_t>(update.kind()));
@@ -155,12 +205,22 @@ Result<std::shared_ptr<Table>> Transaction::Commit() {
}
// XXX: we should handle commit failure and retry here.
- ICEBERG_ASSIGN_OR_RAISE(auto updated_table, table_->catalog()->UpdateTable(
- table_->name(),
requirements, updates));
+ auto commit_result =
+ table_->catalog()->UpdateTable(table_->name(), requirements, updates);
+
+ for (const auto& update : pending_updates_) {
+ if (auto update_ptr = update.lock()) {
+ std::ignore = update_ptr->Finalize(commit_result.has_value()
+ ? std::nullopt
+ :
std::make_optional(commit_result.error()));
+ }
+ }
+
+ ICEBERG_RETURN_UNEXPECTED(commit_result);
// Mark as committed and update table reference
committed_ = true;
- table_ = std::move(updated_table);
+ table_ = std::move(commit_result.value());
return table_;
}
diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h
index ea918a17..3c2395c2 100644
--- a/src/iceberg/transaction.h
+++ b/src/iceberg/transaction.h
@@ -49,6 +49,12 @@ class ICEBERG_EXPORT Transaction : public
std::enable_shared_from_this<Transacti
/// \brief Return the current metadata with staged changes applied
const TableMetadata& current() const;
+ /// \brief Return the location of the metadata file with the given filename
+ ///
+ /// \param filename the name of the metadata file
+ /// \return the location of the metadata file
+ std::string MetadataFileLocation(std::string_view filename) const;
+
/// \brief Apply the pending changes from all actions and commit.
///
/// \return Updated table if the transaction was committed successfully, or
an error:
@@ -81,9 +87,9 @@ class ICEBERG_EXPORT Transaction : public
std::enable_shared_from_this<Transacti
/// \brief Apply the pending changes to current table.
Status Apply(PendingUpdate& updates);
- friend class PendingUpdate; // Need to access the Apply method.
-
private:
+ friend class PendingUpdate;
+
// The table that this transaction will update.
std::shared_ptr<Table> table_;
// The kind of this transaction.
diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h
index 2daf39e6..ff49e1ed 100644
--- a/src/iceberg/type_fwd.h
+++ b/src/iceberg/type_fwd.h
@@ -188,6 +188,7 @@ class Transaction;
/// \brief Update family.
class PendingUpdate;
+class SnapshotUpdate;
class UpdatePartitionSpec;
class UpdateProperties;
class UpdateSchema;
diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build
index e4c786f4..4238e022 100644
--- a/src/iceberg/update/meson.build
+++ b/src/iceberg/update/meson.build
@@ -18,6 +18,7 @@
install_headers(
[
'pending_update.h',
+ 'snapshot_update.h',
'update_partition_spec.h',
'update_schema.h',
'update_sort_order.h',
diff --git a/src/iceberg/update/pending_update.cc
b/src/iceberg/update/pending_update.cc
index 535e7b41..e55a93df 100644
--- a/src/iceberg/update/pending_update.cc
+++ b/src/iceberg/update/pending_update.cc
@@ -30,4 +30,10 @@ PendingUpdate::~PendingUpdate() = default;
Status PendingUpdate::Commit() { return transaction_->Apply(*this); }
+Status PendingUpdate::Finalize([[maybe_unused]] std::optional<Error>
commit_error) {
+ return {};
+}
+
+const TableMetadata& PendingUpdate::base() const { return
transaction_->current(); }
+
} // namespace iceberg
diff --git a/src/iceberg/update/pending_update.h
b/src/iceberg/update/pending_update.h
index 90723987..2124d7e1 100644
--- a/src/iceberg/update/pending_update.h
+++ b/src/iceberg/update/pending_update.h
@@ -23,7 +23,7 @@
/// API for table changes using builder pattern
#include <memory>
-#include <vector>
+#include <optional>
#include "iceberg/iceberg_export.h"
#include "iceberg/result.h"
@@ -45,6 +45,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
kUpdatePartitionSpec,
kUpdateProperties,
kUpdateSchema,
+ kUpdateSnapshot,
kUpdateSortOrder,
};
@@ -59,6 +60,15 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
/// - CommitStateUnknown: unknown status, no cleanup should be done.
virtual Status Commit();
+ /// \brief Finalize the pending update.
+ ///
+ /// This method is called after the update is committed.
+ /// Implementations should override this method to clean up any resources.
+ ///
+ /// \param commit_error An optional error indicating whether the commit was
successful
+ /// \return Status indicating success or failure
+ virtual Status Finalize(std::optional<Error> commit_error);
+
// Non-copyable, movable
PendingUpdate(const PendingUpdate&) = delete;
PendingUpdate& operator=(const PendingUpdate&) = delete;
@@ -70,6 +80,8 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
protected:
explicit PendingUpdate(std::shared_ptr<Transaction> transaction);
+ const TableMetadata& base() const;
+
std::shared_ptr<Transaction> transaction_;
};
diff --git a/src/iceberg/update/snapshot_update.cc
b/src/iceberg/update/snapshot_update.cc
new file mode 100644
index 00000000..2bbb2d50
--- /dev/null
+++ b/src/iceberg/update/snapshot_update.cc
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/snapshot_update.h"
+
+#include <format>
+#include <ranges>
+
+#include "iceberg/constants.h"
+#include "iceberg/file_io.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/manifest/rolling_manifest_writer.h"
+#include "iceberg/partition_summary_internal.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table.h"
+#include "iceberg/transaction.h"
+#include "iceberg/util/macros.h"
+#include "iceberg/util/snapshot_util_internal.h"
+#include "iceberg/util/string_util.h"
+#include "iceberg/util/uuid.h"
+
+namespace iceberg {
+
+namespace {
+
+// The Java impl skips updating total if parsing fails. Here we choose to be
strict.
+Status UpdateTotal(std::unordered_map<std::string, std::string>& summary,
+ const std::unordered_map<std::string, std::string>&
previous_summary,
+ const std::string& total_property, const std::string&
added_property,
+ const std::string& deleted_property) {
+ auto total_it = previous_summary.find(total_property);
+ if (total_it != previous_summary.end()) {
+ ICEBERG_ASSIGN_OR_RAISE(auto new_total,
+ StringUtils::ParseInt<int64_t>(total_it->second));
+
+ auto added_it = summary.find(added_property);
+ if (new_total >= 0 && added_it != summary.end()) {
+ ICEBERG_ASSIGN_OR_RAISE(auto added_value,
+
StringUtils::ParseInt<int64_t>(added_it->second));
+ new_total += added_value;
+ }
+
+ auto deleted_it = summary.find(deleted_property);
+ if (new_total >= 0 && deleted_it != summary.end()) {
+ ICEBERG_ASSIGN_OR_RAISE(auto deleted_value,
+
StringUtils::ParseInt<int64_t>(deleted_it->second));
+ new_total -= deleted_value;
+ }
+
+ if (new_total >= 0) {
+ summary[total_property] = std::to_string(new_total);
+ }
+ }
+ return {};
+}
+
+// Add metadata to a manifest file by reading it and extracting statistics.
+Result<ManifestFile> AddMetadata(const ManifestFile& manifest,
std::shared_ptr<FileIO> io,
+ const TableMetadata& metadata) {
+ ICEBERG_PRECHECK(manifest.added_snapshot_id != kInvalidSnapshotId,
+ "Manifest {} already has assigned a snapshot id: {}",
+ manifest.manifest_path, manifest.added_snapshot_id);
+
+ ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
+ ICEBERG_ASSIGN_OR_RAISE(auto spec,
+
metadata.PartitionSpecById(manifest.partition_spec_id));
+ ICEBERG_ASSIGN_OR_RAISE(auto partition_type, spec->PartitionType(*schema));
+
+ ICEBERG_ASSIGN_OR_RAISE(auto reader,
+ ManifestReader::Make(manifest, std::move(io),
schema, spec));
+ ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries());
+
+ PartitionSummary stats(*partition_type);
+ int32_t added_files = 0;
+ int64_t added_rows = 0;
+ int32_t existing_files = 0;
+ int64_t existing_rows = 0;
+ int32_t deleted_files = 0;
+ int64_t deleted_rows = 0;
+
+ std::optional<int64_t> snapshot_id;
+ int64_t max_snapshot_id = std::numeric_limits<int64_t>::min();
+ for (const auto& entry : entries) {
+ ICEBERG_PRECHECK(entry.data_file != nullptr,
+ "Manifest entry in {} is missing data_file",
manifest.manifest_path);
+
+ if (entry.snapshot_id.has_value() && entry.snapshot_id.value() >
max_snapshot_id) {
+ max_snapshot_id = entry.snapshot_id.value();
+ }
+
+ switch (entry.status) {
+ case ManifestStatus::kAdded: {
+ added_files += 1;
+ added_rows += entry.data_file->record_count;
+ if (!snapshot_id.has_value() && entry.snapshot_id.has_value()) {
+ snapshot_id = entry.snapshot_id;
+ }
+ } break;
+ case ManifestStatus::kExisting: {
+ existing_files += 1;
+ existing_rows += entry.data_file->record_count;
+ } break;
+ case ManifestStatus::kDeleted: {
+ deleted_files += 1;
+ deleted_rows += entry.data_file->record_count;
+ if (!snapshot_id.has_value() && entry.snapshot_id.has_value()) {
+ snapshot_id = entry.snapshot_id;
+ }
+ } break;
+ }
+
+ ICEBERG_RETURN_UNEXPECTED(stats.Update(entry.data_file->partition));
+ }
+
+ if (!snapshot_id.has_value()) {
+ // If no files were added or deleted, use the largest snapshot ID in the
manifest
+ snapshot_id = max_snapshot_id;
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(auto partition_summaries, stats.Summaries());
+
+ ManifestFile enriched = manifest;
+ enriched.added_snapshot_id = snapshot_id.value();
+ enriched.added_files_count = added_files;
+ enriched.existing_files_count = existing_files;
+ enriched.deleted_files_count = deleted_files;
+ enriched.added_rows_count = added_rows;
+ enriched.existing_rows_count = existing_rows;
+ enriched.deleted_rows_count = deleted_rows;
+ enriched.partitions = std::move(partition_summaries);
+ enriched.first_row_id = std::nullopt;
+ return enriched;
+}
+
+} // anonymous namespace
+
+SnapshotUpdate::~SnapshotUpdate() = default;
+
+SnapshotUpdate::SnapshotUpdate(std::shared_ptr<Transaction> transaction)
+ : PendingUpdate(std::move(transaction)),
+ can_inherit_snapshot_id_(
+ base().format_version > 1 ||
+
base().properties.Get(TableProperties::kSnapshotIdInheritanceEnabled)),
+ commit_uuid_(Uuid::GenerateV7().ToString()),
+ target_manifest_size_bytes_(
+ base().properties.Get(TableProperties::kManifestTargetSizeBytes)) {}
+
+// TODO(xxx): write manifests in parallel
+Result<std::vector<ManifestFile>> SnapshotUpdate::WriteDataManifests(
+ const std::vector<std::shared_ptr<DataFile>>& data_files,
+ const std::shared_ptr<PartitionSpec>& spec,
+ std::optional<int64_t> data_sequence_number) {
+ if (data_files.empty()) {
+ return std::vector<ManifestFile>{};
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(auto current_schema, base().Schema());
+ RollingManifestWriter rolling_writer(
+ [this, spec, schema = std::move(current_schema),
+ snapshot_id = SnapshotId()]() ->
Result<std::unique_ptr<ManifestWriter>> {
+ return ManifestWriter::MakeWriter(base().format_version, snapshot_id,
+ ManifestPath(),
transaction_->table()->io(),
+ std::move(spec), std::move(schema),
+ ManifestContent::kData,
+ /*first_row_id=*/base().next_row_id);
+ },
+ target_manifest_size_bytes_);
+
+ for (const auto& file : data_files) {
+ ICEBERG_RETURN_UNEXPECTED(rolling_writer.WriteAddedEntry(file,
data_sequence_number));
+ }
+ ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close());
+ return rolling_writer.ToManifestFiles();
+}
+
+// TODO(xxx): write manifests in parallel
+Result<std::vector<ManifestFile>> SnapshotUpdate::WriteDeleteManifests(
+ const std::vector<std::shared_ptr<DataFile>>& delete_files,
+ const std::shared_ptr<PartitionSpec>& spec) {
+ if (delete_files.empty()) {
+ return std::vector<ManifestFile>{};
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(auto current_schema, base().Schema());
+ RollingManifestWriter rolling_writer(
+ [this, spec, schema = std::move(current_schema),
+ snapshot_id = SnapshotId()]() ->
Result<std::unique_ptr<ManifestWriter>> {
+ return ManifestWriter::MakeWriter(base().format_version, snapshot_id,
+ ManifestPath(),
transaction_->table()->io(),
+ std::move(spec), std::move(schema),
+ ManifestContent::kDeletes);
+ },
+ target_manifest_size_bytes_);
+
+ for (const auto& file : delete_files) {
+ /// FIXME: Java impl wrap it with `PendingDeleteFile` and deals with
+ /// file->data_sequenece_number
+ ICEBERG_RETURN_UNEXPECTED(rolling_writer.WriteAddedEntry(file));
+ }
+ ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close());
+ return rolling_writer.ToManifestFiles();
+}
+
+int64_t SnapshotUpdate::SnapshotId() {
+ if (!snapshot_id_.has_value()) {
+ snapshot_id_ = SnapshotUtil::GenerateSnapshotId(base());
+ }
+ return snapshot_id_.value();
+}
+
+Result<SnapshotUpdate::ApplyResult> SnapshotUpdate::Apply() {
+ ICEBERG_RETURN_UNEXPECTED(CheckErrors());
+ ICEBERG_ASSIGN_OR_RAISE(auto parent_snapshot,
+ SnapshotUtil::OptionalLatestSnapshot(base(),
target_branch_));
+
+ int64_t sequence_number = base().NextSequenceNumber();
+ std::optional<int64_t> parent_snapshot_id =
+ parent_snapshot ? std::make_optional(parent_snapshot->snapshot_id) :
std::nullopt;
+
+ if (parent_snapshot) {
+ ICEBERG_RETURN_UNEXPECTED(Validate(base(), parent_snapshot));
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(auto manifests, Apply(base(), parent_snapshot));
+ for (auto& manifest : manifests) {
+ if (manifest.added_snapshot_id != kInvalidSnapshotId) {
+ continue;
+ }
+ // TODO(xxx): read in parallel and cache enriched manifests for retries
+ ICEBERG_ASSIGN_OR_RAISE(manifest,
+ AddMetadata(manifest, transaction_->table()->io(),
base()));
+ }
+
+ std::string manifest_list_path = ManifestListPath();
+ manifest_lists_.push_back(manifest_list_path);
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto writer, ManifestListWriter::MakeWriter(base().format_version,
SnapshotId(),
+ parent_snapshot_id,
manifest_list_path,
+ transaction_->table()->io(),
+ sequence_number,
base().next_row_id));
+ ICEBERG_RETURN_UNEXPECTED(writer->AddAll(manifests));
+ ICEBERG_RETURN_UNEXPECTED(writer->Close());
+
+ std::optional<int64_t> next_row_id;
+ std::optional<int64_t> assigned_rows;
+ if (base().format_version >= 3) {
+ ICEBERG_CHECK(writer->next_row_id().has_value(),
+ "row id is required by format version >= 3");
+ next_row_id = base().next_row_id;
+ assigned_rows = writer->next_row_id().value() - base().next_row_id;
+ }
+
+ std::string op = operation();
+ ICEBERG_CHECK(!op.empty(), "Snapshot operation cannot be empty");
+
+ if (op == DataOperation::kReplace) {
+ const auto summary = Summary();
+ auto added_records_it = summary.find(SnapshotSummaryFields::kAddedRecords);
+ auto replaced_records_it =
summary.find(SnapshotSummaryFields::kDeletedRecords);
+ if (added_records_it != summary.cend() && replaced_records_it !=
summary.cend()) {
+ ICEBERG_ASSIGN_OR_RAISE(auto added_records,
+
StringUtils::ParseInt<int64_t>(added_records_it->second));
+ ICEBERG_ASSIGN_OR_RAISE(auto replaced_records,
StringUtils::ParseInt<int64_t>(
+
replaced_records_it->second));
+ ICEBERG_PRECHECK(
+ added_records <= replaced_records,
+ "Invalid REPLACE operation: {} added records > {} replaced records",
+ added_records, replaced_records);
+ }
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(auto summary, ComputeSummary(base()));
+ ICEBERG_ASSIGN_OR_RAISE(
+ staged_snapshot_,
+ Snapshot::Make(sequence_number, SnapshotId(), parent_snapshot_id,
+ CurrentTimePointMs(), std::move(op), std::move(summary),
+ base().current_schema_id, std::move(manifest_list_path),
next_row_id,
+ assigned_rows));
+
+ return ApplyResult{.snapshot = staged_snapshot_,
+ .target_branch = target_branch_,
+ .stage_only = stage_only_};
+}
+
+Status SnapshotUpdate::Finalize(std::optional<Error> commit_error) {
+ if (commit_error.has_value()) {
+ if (commit_error->kind == ErrorKind::kCommitStateUnknown) {
+ return {};
+ }
+ CleanAll();
+ return {};
+ }
+
+ if (CleanupAfterCommit()) {
+ ICEBERG_CHECK(staged_snapshot_ != nullptr,
+ "Staged snapshot is null during finalize after commit");
+ auto cached_snapshot = SnapshotCache(staged_snapshot_.get());
+ ICEBERG_ASSIGN_OR_RAISE(auto manifests,
+
cached_snapshot.Manifests(transaction_->table()->io()));
+ CleanUncommitted(manifests | std::views::transform([](const auto&
manifest) {
+ return manifest.manifest_path;
+ }) |
+ std::ranges::to<std::unordered_set<std::string>>());
+ }
+
+ // Also clean up unused manifest lists created by multiple attempts
+ for (const auto& manifest_list : manifest_lists_) {
+ if (manifest_list != staged_snapshot_->manifest_list) {
+ std::ignore = DeleteFile(manifest_list);
+ }
+ }
+
+ return {};
+}
+
+Status SnapshotUpdate::SetTargetBranch(const std::string& branch) {
+ ICEBERG_PRECHECK(!branch.empty(), "Branch name cannot be empty");
+
+ if (auto ref_it = base().refs.find(branch); ref_it != base().refs.end()) {
+ ICEBERG_PRECHECK(
+ ref_it->second->type() == SnapshotRefType::kBranch,
+ "{} is a tag, not a branch. Tags cannot be targets for producing
snapshots",
+ branch);
+ }
+
+ target_branch_ = branch;
+ return {};
+}
+
+Result<std::unordered_map<std::string, std::string>>
SnapshotUpdate::ComputeSummary(
+ const TableMetadata& previous) {
+ std::unordered_map<std::string, std::string> summary = Summary();
+ if (summary.empty()) {
+ return summary;
+ }
+
+ // Get previous summary from the target branch
+ std::unordered_map<std::string, std::string> previous_summary;
+ if (auto ref_it = previous.refs.find(target_branch_); ref_it !=
previous.refs.end()) {
+ if (auto snap_it = previous.SnapshotById(ref_it->second->snapshot_id);
+ snap_it.has_value()) {
+ previous_summary = snap_it.value()->summary;
+ }
+ } else {
+ // if there was no previous snapshot, default the summary to start totals
at 0
+ previous_summary[SnapshotSummaryFields::kTotalRecords] = "0";
+ previous_summary[SnapshotSummaryFields::kTotalFileSize] = "0";
+ previous_summary[SnapshotSummaryFields::kTotalDataFiles] = "0";
+ previous_summary[SnapshotSummaryFields::kTotalDeleteFiles] = "0";
+ previous_summary[SnapshotSummaryFields::kTotalPosDeletes] = "0";
+ previous_summary[SnapshotSummaryFields::kTotalEqDeletes] = "0";
+ }
+
+ // Update totals
+ ICEBERG_RETURN_UNEXPECTED(UpdateTotal(
+ summary, previous_summary, SnapshotSummaryFields::kTotalRecords,
+ SnapshotSummaryFields::kAddedRecords,
SnapshotSummaryFields::kDeletedRecords));
+ ICEBERG_RETURN_UNEXPECTED(UpdateTotal(
+ summary, previous_summary, SnapshotSummaryFields::kTotalFileSize,
+ SnapshotSummaryFields::kAddedFileSize,
SnapshotSummaryFields::kRemovedFileSize));
+ ICEBERG_RETURN_UNEXPECTED(UpdateTotal(
+ summary, previous_summary, SnapshotSummaryFields::kTotalDataFiles,
+ SnapshotSummaryFields::kAddedDataFiles,
SnapshotSummaryFields::kDeletedDataFiles));
+ ICEBERG_RETURN_UNEXPECTED(UpdateTotal(summary, previous_summary,
+
SnapshotSummaryFields::kTotalDeleteFiles,
+
SnapshotSummaryFields::kAddedDeleteFiles,
+
SnapshotSummaryFields::kRemovedDeleteFiles));
+ ICEBERG_RETURN_UNEXPECTED(UpdateTotal(summary, previous_summary,
+
SnapshotSummaryFields::kTotalPosDeletes,
+
SnapshotSummaryFields::kAddedPosDeletes,
+
SnapshotSummaryFields::kRemovedPosDeletes));
+ ICEBERG_RETURN_UNEXPECTED(UpdateTotal(
+ summary, previous_summary, SnapshotSummaryFields::kTotalEqDeletes,
+ SnapshotSummaryFields::kAddedEqDeletes,
SnapshotSummaryFields::kRemovedEqDeletes));
+
+ // TODO(xxx): add custom summary fields like engine info
+ return summary;
+}
+
+void SnapshotUpdate::CleanAll() {
+ for (const auto& manifest_list : manifest_lists_) {
+ std::ignore = DeleteFile(manifest_list);
+ }
+ manifest_lists_.clear();
+ CleanUncommitted(std::unordered_set<std::string>{});
+}
+
+Status SnapshotUpdate::DeleteFile(const std::string& path) {
+ static const auto kDefaultDeleteFunc = [this](const std::string& path) {
+ return this->transaction_->table()->io()->DeleteFile(path);
+ };
+ if (delete_func_) {
+ return delete_func_(path);
+ } else {
+ return kDefaultDeleteFunc(path);
+ }
+}
+
+std::string SnapshotUpdate::ManifestListPath() {
+ // Generate manifest list path
+ // Format: {metadata_location}/snap-{snapshot_id}-{attempt}-{uuid}.avro
+ int64_t snapshot_id = SnapshotId();
+ std::string filename =
+ std::format("snap-{}-{}-{}.avro", snapshot_id, ++attempt_, commit_uuid_);
+ return transaction_->MetadataFileLocation(filename);
+}
+
+std::string SnapshotUpdate::ManifestPath() {
+ // Generate manifest path
+ // Format: {metadata_location}/{uuid}-m{manifest_count}.avro
+ std::string filename = std::format("{}-m{}.avro", commit_uuid_,
manifest_count_++);
+ return transaction_->MetadataFileLocation(filename);
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/update/snapshot_update.h
b/src/iceberg/update/snapshot_update.h
new file mode 100644
index 00000000..48ef1676
--- /dev/null
+++ b/src/iceberg/update/snapshot_update.h
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <functional>
+#include <memory>
+#include <optional>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "iceberg/iceberg_export.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/result.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/type_fwd.h"
+#include "iceberg/update/pending_update.h"
+
+namespace iceberg {
+
+/// \brief Base class for operations that produce snapshots.
+///
+/// This class provides common functionality for creating new snapshots,
+/// including manifest list writing and cleanup.
+class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate {
+ public:
+ /// \brief Result of applying a snapshot update
+ struct ApplyResult {
+ std::shared_ptr<Snapshot> snapshot;
+ std::string target_branch;
+ bool stage_only = false;
+ };
+
+ ~SnapshotUpdate() override;
+
+ /// \brief Set a callback to delete files instead of the table's default.
+ ///
+ /// \param delete_func A function used to delete file locations
+ /// \return Reference to this for method chaining
+ /// \note Cannot be called more than once
+ auto& DeleteWith(this auto& self,
+ std::function<Status(const std::string&)> delete_func) {
+ if (self.delete_func_) {
+ return self.AddError(ErrorKind::kInvalidArgument,
+ "Cannot set delete callback more than once");
+ }
+ self.delete_func_ = std::move(delete_func);
+ return self;
+ }
+
+ /// \brief Stage a snapshot in table metadata, but not update the current
snapshot id.
+ ///
+ /// \return Reference to this for method chaining
+ auto& StageOnly(this auto& self) {
+ self.stage_only_ = true;
+ return self;
+ }
+
+ /// \brief Apply the update's changes to create a new snapshot.
+ ///
+ /// This method validates the changes, applies them to the metadata,
+ /// and creates a new snapshot without committing it. The snapshot
+ /// is stored internally and can be accessed after Apply() succeeds.
+ ///
+ /// \return A result containing the new snapshot, or an error
+ Result<ApplyResult> Apply();
+
+ /// \brief Finalize the snapshot update, cleaning up any uncommitted files.
+ Status Finalize(std::optional<Error> commit_error) override;
+
+ protected:
+ explicit SnapshotUpdate(std::shared_ptr<Transaction> transaction);
+
+ /// \brief Write data manifests for the given data files
+ ///
+ /// \param data_files The data files to write
+ /// \param spec The partition spec to use
+ /// \param data_sequence_number Optional data sequence number for the files
+ /// \return A vector of manifest files
+ Result<std::vector<ManifestFile>> WriteDataManifests(
+ const std::vector<std::shared_ptr<DataFile>>& data_files,
+ const std::shared_ptr<PartitionSpec>& spec,
+ std::optional<int64_t> data_sequence_number = std::nullopt);
+
+ /// \brief Write delete manifests for the given delete files
+ ///
+ /// \param delete_files The delete files to write
+ /// \param spec The partition spec to use
+ /// \return A vector of manifest files
+ Result<std::vector<ManifestFile>> WriteDeleteManifests(
+ const std::vector<std::shared_ptr<DataFile>>& delete_files,
+ const std::shared_ptr<PartitionSpec>& spec);
+
+ Status SetTargetBranch(const std::string& branch);
+ const std::string& target_branch() const { return target_branch_; }
+ bool can_inherit_snapshot_id() const { return can_inherit_snapshot_id_; }
+ const std::string& commit_uuid() const { return commit_uuid_; }
+ int32_t manifest_count() const { return manifest_count_; }
+ int32_t attempt() const { return attempt_; }
+ int64_t target_manifest_size_bytes() const { return
target_manifest_size_bytes_; }
+
+ /// \brief Clean up any uncommitted manifests that were created.
+ ///
+ /// Manifests may not be committed if Apply is called multiple times
+ /// because a commit conflict has occurred. Implementations may keep
+ /// around manifests because the same changes will be made by both
+ /// Apply calls. This method instructs the implementation to clean up
+ /// those manifests and passes the paths of the manifests that were
+ /// actually committed.
+ ///
+ /// \param committed A set of manifest paths that were actually committed
+ virtual void CleanUncommitted(const std::unordered_set<std::string>&
committed) = 0;
+
+ /// \brief A string that describes the action that produced the new snapshot.
+ ///
+ /// \return A string operation name
+ virtual std::string operation() = 0;
+
+ /// \brief Validate the current metadata.
+ ///
+ /// Child operations can override this to add custom validation.
+ ///
+ /// \param current_metadata Current table metadata to validate
+ /// \param snapshot Ending snapshot on the lineage which is being validated
+ virtual Status Validate(const TableMetadata& current_metadata,
+ const std::shared_ptr<Snapshot>& snapshot) {
+ return {};
+ };
+
+ /// \brief Apply the update's changes to the given metadata and snapshot.
+ ///
+ /// \param metadata_to_update The base table metadata to apply changes to
+ /// \param snapshot Snapshot to apply the changes to
+ /// \return A vector of manifest files for the new snapshot
+ virtual Result<std::vector<ManifestFile>> Apply(
+ const TableMetadata& metadata_to_update,
+ const std::shared_ptr<Snapshot>& snapshot) = 0;
+
+ /// \brief Get the summary map for this operation.
+ ///
+ /// \return A map of summary properties
+ virtual std::unordered_map<std::string, std::string> Summary() = 0;
+
+ /// \brief Check if cleanup should happen after commit
+ ///
+ /// \return True if cleanup should happen after commit
+ virtual bool CleanupAfterCommit() const { return true; }
+
+ /// \brief Get or generate the snapshot ID for the new snapshot.
+ int64_t SnapshotId();
+
+ private:
+ /// \brief Returns the snapshot summary from the implementation and updates
totals.
+ Result<std::unordered_map<std::string, std::string>> ComputeSummary(
+ const TableMetadata& previous);
+
+ /// \brief Clean up all uncommitted files
+ void CleanAll();
+
+ Status DeleteFile(const std::string& path);
+ std::string ManifestListPath();
+ std::string ManifestPath();
+
+ private:
+ const bool can_inherit_snapshot_id_{true};
+ const std::string commit_uuid_;
+ int32_t manifest_count_{0};
+ int32_t attempt_{0};
+ std::vector<std::string> manifest_lists_;
+ const int64_t target_manifest_size_bytes_;
+ std::optional<int64_t> snapshot_id_;
+ bool stage_only_{false};
+ std::function<Status(const std::string&)> delete_func_;
+ std::string target_branch_{SnapshotRef::kMainBranch};
+ std::shared_ptr<Snapshot> staged_snapshot_;
+};
+
+} // namespace iceberg
diff --git a/src/iceberg/update/update_partition_spec.cc
b/src/iceberg/update/update_partition_spec.cc
index ffea1e09..54c3dc60 100644
--- a/src/iceberg/update/update_partition_spec.cc
+++ b/src/iceberg/update/update_partition_spec.cc
@@ -47,11 +47,10 @@ Result<std::shared_ptr<UpdatePartitionSpec>>
UpdatePartitionSpec::Make(
UpdatePartitionSpec::UpdatePartitionSpec(std::shared_ptr<Transaction>
transaction)
: PendingUpdate(std::move(transaction)) {
- const TableMetadata& base_metadata = transaction_->current();
- format_version_ = base_metadata.format_version;
+ format_version_ = base().format_version;
// Get the current/default partition spec
- auto spec_result = base_metadata.PartitionSpec();
+ auto spec_result = base().PartitionSpec();
if (!spec_result.has_value()) {
AddError(spec_result.error());
return;
@@ -59,15 +58,15 @@
UpdatePartitionSpec::UpdatePartitionSpec(std::shared_ptr<Transaction> transactio
spec_ = std::move(spec_result.value());
// Get the current schema
- auto schema_result = base_metadata.Schema();
+ auto schema_result = base().Schema();
if (!schema_result.has_value()) {
AddError(schema_result.error());
return;
}
schema_ = std::move(schema_result.value());
- last_assigned_partition_id_ = std::max(base_metadata.last_partition_id,
-
PartitionSpec::kLegacyPartitionDataIdStart - 1);
+ last_assigned_partition_id_ =
+ std::max(base().last_partition_id,
PartitionSpec::kLegacyPartitionDataIdStart - 1);
name_to_field_ = IndexSpecByName(*spec_);
transform_to_field_ = IndexSpecByTransform(*spec_);
@@ -433,18 +432,16 @@ UpdatePartitionSpec::IndexSpecByTransform(const
PartitionSpec& spec) {
}
void UpdatePartitionSpec::BuildHistoricalFieldsIndex() {
- const TableMetadata& base_metadata = transaction_->current();
-
// Count total fields across all specs to reserve capacity
size_t total_fields = 0;
- for (const auto& partition_spec : base_metadata.partition_specs) {
+ for (const auto& partition_spec : base().partition_specs) {
total_fields += partition_spec->fields().size();
}
historical_fields_.reserve(total_fields);
// Index all fields from all historical partition specs
// Later specs override earlier ones for the same (source_id, transform) key
- for (const auto& partition_spec : base_metadata.partition_specs) {
+ for (const auto& partition_spec : base().partition_specs) {
for (const auto& field : partition_spec->fields()) {
TransformKey key{field.source_id(), field.transform()->ToString()};
historical_fields_.emplace(key, field);
diff --git a/src/iceberg/update/update_properties.cc
b/src/iceberg/update/update_properties.cc
index ce809c43..fe49df81 100644
--- a/src/iceberg/update/update_properties.cc
+++ b/src/iceberg/update/update_properties.cc
@@ -19,10 +19,8 @@
#include "iceberg/update/update_properties.h"
-#include <charconv>
#include <cstdint>
#include <memory>
-#include <system_error>
#include "iceberg/metrics_config.h"
#include "iceberg/result.h"
@@ -31,6 +29,7 @@
#include "iceberg/transaction.h"
#include "iceberg/util/error_collector.h"
#include "iceberg/util/macros.h"
+#include "iceberg/util/string_util.h"
namespace iceberg {
@@ -70,7 +69,7 @@ UpdateProperties& UpdateProperties::Remove(const std::string&
key) {
Result<UpdateProperties::ApplyResult> UpdateProperties::Apply() {
ICEBERG_RETURN_UNEXPECTED(CheckErrors());
- const auto& current_props = transaction_->current().properties.configs();
+ const auto& current_props = base().properties.configs();
std::unordered_map<std::string, std::string> new_properties;
std::vector<std::string> removals;
for (const auto& [key, value] : current_props) {
@@ -85,15 +84,8 @@ Result<UpdateProperties::ApplyResult>
UpdateProperties::Apply() {
auto iter = new_properties.find(TableProperties::kFormatVersion.key());
if (iter != new_properties.end()) {
- int parsed_version = 0;
- const auto& val = iter->second;
- auto [ptr, ec] = std::from_chars(val.data(), val.data() + val.size(),
parsed_version);
-
- if (ec == std::errc::invalid_argument) {
- return InvalidArgument("Invalid format version '{}': not a valid
integer", val);
- } else if (ec == std::errc::result_out_of_range) {
- return InvalidArgument("Format version '{}' is out of range", val);
- }
+ ICEBERG_ASSIGN_OR_RAISE(auto parsed_version,
+ StringUtils::ParseInt<int32_t>(iter->second));
if (parsed_version > TableMetadata::kSupportedTableFormatVersion) {
return InvalidArgument(
@@ -105,7 +97,7 @@ Result<UpdateProperties::ApplyResult>
UpdateProperties::Apply() {
updates_.erase(TableProperties::kFormatVersion.key());
}
- if (auto schema = transaction_->current().Schema(); schema.has_value()) {
+ if (auto schema = base().Schema(); schema.has_value()) {
ICEBERG_RETURN_UNEXPECTED(
MetricsConfig::VerifyReferencedColumns(new_properties,
*schema.value()));
}
diff --git a/src/iceberg/update/update_schema.cc
b/src/iceberg/update/update_schema.cc
index 0e81c4ad..327ee0ac 100644
--- a/src/iceberg/update/update_schema.cc
+++ b/src/iceberg/update/update_schema.cc
@@ -250,10 +250,8 @@ Result<std::shared_ptr<UpdateSchema>> UpdateSchema::Make(
UpdateSchema::UpdateSchema(std::shared_ptr<Transaction> transaction)
: PendingUpdate(std::move(transaction)) {
- const TableMetadata& base_metadata = transaction_->current();
-
// Get the current schema
- auto schema_result = base_metadata.Schema();
+ auto schema_result = base().Schema();
if (!schema_result.has_value()) {
AddError(schema_result.error());
return;
@@ -261,7 +259,7 @@ UpdateSchema::UpdateSchema(std::shared_ptr<Transaction>
transaction)
schema_ = std::move(schema_result.value());
// Initialize last_column_id from base metadata
- last_column_id_ = base_metadata.last_column_id;
+ last_column_id_ = base().last_column_id;
// Initialize identifier field names from the current schema
auto identifier_names_result = schema_->IdentifierFieldNames();
diff --git a/src/iceberg/update/update_sort_order.cc
b/src/iceberg/update/update_sort_order.cc
index e3e651d5..c5c7be32 100644
--- a/src/iceberg/update/update_sort_order.cc
+++ b/src/iceberg/update/update_sort_order.cc
@@ -19,7 +19,6 @@
#include "iceberg/update/update_sort_order.h"
-#include <cstdint>
#include <memory>
#include <vector>
@@ -52,7 +51,7 @@ UpdateSortOrder& UpdateSortOrder::AddSortField(const
std::shared_ptr<Term>& term
ICEBERG_BUILDER_CHECK(term != nullptr, "Term cannot be null");
ICEBERG_BUILDER_CHECK(term->is_unbound(), "Term must be unbound");
- ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto schema,
transaction_->current().Schema());
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto schema, base().Schema());
if (term->kind() == Term::Kind::kReference) {
// kReference is treated as identity transform
auto named_ref = internal::checked_pointer_cast<NamedReference>(term);
@@ -99,7 +98,7 @@ Result<std::shared_ptr<SortOrder>> UpdateSortOrder::Apply() {
// The actual sort order ID will be assigned by TableMetadataBuilder when
// the AddSortOrder update is applied.
ICEBERG_ASSIGN_OR_RAISE(order, SortOrder::Make(/*sort_id=*/-1,
sort_fields_));
- ICEBERG_ASSIGN_OR_RAISE(auto schema, transaction_->current().Schema());
+ ICEBERG_ASSIGN_OR_RAISE(auto schema, base().Schema());
ICEBERG_RETURN_UNEXPECTED(order->Validate(*schema));
}
return order;
diff --git a/src/iceberg/util/snapshot_util.cc
b/src/iceberg/util/snapshot_util.cc
index 2ec36478..4395dc27 100644
--- a/src/iceberg/util/snapshot_util.cc
+++ b/src/iceberg/util/snapshot_util.cc
@@ -26,6 +26,7 @@
#include "iceberg/util/macros.h"
#include "iceberg/util/snapshot_util_internal.h"
#include "iceberg/util/timepoint.h"
+#include "iceberg/util/uuid.h"
namespace iceberg {
@@ -335,4 +336,28 @@ Result<std::shared_ptr<Snapshot>>
SnapshotUtil::LatestSnapshot(
return metadata.SnapshotById(it->second->snapshot_id);
}
+Result<std::shared_ptr<Snapshot>> SnapshotUtil::OptionalLatestSnapshot(
+ const TableMetadata& metadata, const std::string& branch) {
+ return LatestSnapshot(metadata, branch)
+ .or_else([](const auto& error) -> Result<std::shared_ptr<Snapshot>> {
+ if (error.kind == ErrorKind::kNotFound) {
+ return nullptr;
+ }
+ return std::unexpected<Error>(error);
+ });
+}
+
+int64_t SnapshotUtil::GenerateSnapshotId() {
+ auto uuid = Uuid::GenerateV7();
+ return (uuid.high_bits() ^ uuid.low_bits()) &
std::numeric_limits<int64_t>::max();
+}
+
+int64_t SnapshotUtil::GenerateSnapshotId(const TableMetadata& metadata) {
+ auto snapshot_id = GenerateSnapshotId();
+ while (metadata.SnapshotById(snapshot_id).has_value()) {
+ snapshot_id = GenerateSnapshotId();
+ }
+ return snapshot_id;
+}
+
} // namespace iceberg
diff --git a/src/iceberg/util/snapshot_util_internal.h
b/src/iceberg/util/snapshot_util_internal.h
index 2b11168e..befa96fe 100644
--- a/src/iceberg/util/snapshot_util_internal.h
+++ b/src/iceberg/util/snapshot_util_internal.h
@@ -20,6 +20,7 @@
#pragma once
#include <functional>
+#include <memory>
#include <optional>
#include <string>
#include <vector>
@@ -229,13 +230,13 @@ class ICEBERG_EXPORT SnapshotUtil {
/// \brief Fetch the snapshot at the head of the given branch in the given
table.
///
- /// This method calls Table::current_snapshot() instead of using branch API
for the main
+ /// This method calls TableMetadata::Snapshot() instead of using branch API
for the main
/// branch so that existing code still goes through the old code path to
ensure
/// backwards compatibility.
///
/// \param table The table
/// \param branch Branch name of the table (empty string means main branch)
- /// \return The latest snapshot for the given branch
+ /// \return The latest snapshot for the given branch, or NotFoundError.
static Result<std::shared_ptr<Snapshot>> LatestSnapshot(const Table& table,
const std::string&
branch);
@@ -251,10 +252,31 @@ class ICEBERG_EXPORT SnapshotUtil {
/// \param metadata The table metadata
/// \param branch Branch name of the table metadata (empty string means main
/// branch)
- /// \return The latest snapshot for the given branch
+ /// \return The latest snapshot for the given branch, or NotFoundError.
static Result<std::shared_ptr<Snapshot>> LatestSnapshot(const TableMetadata&
metadata,
const std::string&
branch);
+ /// \brief Fetch the snapshot at the head of the given branch in the given
table.
+ ///
+ /// Like LatestSnapshot above except that nullptr is returned if snapshot
does not
+ /// exist.
+ ///
+ /// \param metadata The table metadata
+ /// \param branch Branch name of the table metadata (empty string means main
+ /// branch)
+ /// \return The latest snapshot for the given branch, or nullptr if not
found.
+ static Result<std::shared_ptr<Snapshot>> OptionalLatestSnapshot(
+ const TableMetadata& metadata, const std::string& branch);
+
+ /// \brief Generate a new snapshot ID.
+ static int64_t GenerateSnapshotId();
+
+ /// \brief Generate a new snapshot ID for the given metadata.
+ ///
+ /// \param metadata The table metadata
+ /// \return A new snapshot ID
+ static int64_t GenerateSnapshotId(const TableMetadata& metadata);
+
private:
/// \brief Helper function to traverse ancestors of a snapshot.
///
diff --git a/src/iceberg/util/string_util.h b/src/iceberg/util/string_util.h
index 0c9e89bc..dfedb4a7 100644
--- a/src/iceberg/util/string_util.h
+++ b/src/iceberg/util/string_util.h
@@ -20,10 +20,14 @@
#pragma once
#include <algorithm>
+#include <charconv>
#include <ranges>
#include <string>
+#include <string_view>
+#include <typeinfo>
#include "iceberg/iceberg_export.h"
+#include "iceberg/result.h"
namespace iceberg {
@@ -61,6 +65,20 @@ class ICEBERG_EXPORT StringUtils {
}
return count;
}
+
+ template <typename T>
+ static Result<T> ParseInt(std::string_view str) {
+ T value = 0;
+ auto [ptr, ec] = std::from_chars(str.data(), str.data() + str.size(),
value);
+ if (ec == std::errc::invalid_argument) [[unlikely]] {
+ return InvalidArgument("Failed to parse integer from string '{}':
invalid argument",
+ str);
+ } else if (ec == std::errc::result_out_of_range) [[unlikely]] {
+ return InvalidArgument("Failed to parse {} from string '{}': value out
of range",
+ typeid(T).name(), str);
+ }
+ return value;
+ }
};
/// \brief Transparent hash function that supports std::string_view as lookup
key
diff --git a/src/iceberg/util/timepoint.cc b/src/iceberg/util/timepoint.cc
index 0381e90a..ed52dddf 100644
--- a/src/iceberg/util/timepoint.cc
+++ b/src/iceberg/util/timepoint.cc
@@ -60,4 +60,11 @@ std::string FormatTimePointMs(TimePointMs time_point_ms) {
return oss.str();
}
+TimePointMs CurrentTimePointMs() {
+ auto now = std::chrono::system_clock::now();
+ auto duration_since_epoch = now.time_since_epoch();
+ return TimePointMs{
+
std::chrono::duration_cast<std::chrono::milliseconds>(duration_since_epoch)};
+}
+
} // namespace iceberg
diff --git a/src/iceberg/util/timepoint.h b/src/iceberg/util/timepoint.h
index 6052c94a..ed303e1f 100644
--- a/src/iceberg/util/timepoint.h
+++ b/src/iceberg/util/timepoint.h
@@ -49,4 +49,7 @@ ICEBERG_EXPORT int64_t UnixNsFromTimePointNs(TimePointNs
time_point_ns);
/// \brief Returns a human-readable string representation of a TimePointMs
ICEBERG_EXPORT std::string FormatTimePointMs(TimePointMs time_point_ms);
+/// \brief Returns a time point in milliseconds that represents the current
system time
+ICEBERG_EXPORT TimePointMs CurrentTimePointMs();
+
} // namespace iceberg
diff --git a/src/iceberg/util/uuid.cc b/src/iceberg/util/uuid.cc
index 9322deb9..cc76095a 100644
--- a/src/iceberg/util/uuid.cc
+++ b/src/iceberg/util/uuid.cc
@@ -217,4 +217,16 @@ std::string Uuid::ToString() const {
data_[15]);
}
+int64_t Uuid::high_bits() const {
+ int64_t result;
+ std::memcpy(&result, data_.data(), 8);
+ return result;
+}
+
+int64_t Uuid::low_bits() const {
+ int64_t result;
+ std::memcpy(&result, data_.data() + 8, 8);
+ return result;
+}
+
} // namespace iceberg
diff --git a/src/iceberg/util/uuid.h b/src/iceberg/util/uuid.h
index 64db7c5d..69ac10fc 100644
--- a/src/iceberg/util/uuid.h
+++ b/src/iceberg/util/uuid.h
@@ -78,6 +78,9 @@ class ICEBERG_EXPORT Uuid : public util::Formattable {
return lhs.data_ == rhs.data_;
}
+ int64_t high_bits() const;
+ int64_t low_bits() const;
+
private:
std::array<uint8_t, kLength> data_;
};