This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 8fdf3462 refactor: introduce TransactionContext to decouple
Transaction and PendingUpdate (#591)
8fdf3462 is described below
commit 8fdf34628d8e5bdec53e56fbeaa18ff27bff6a4d
Author: Gang Wu <[email protected]>
AuthorDate: Mon Mar 16 22:16:39 2026 +0800
refactor: introduce TransactionContext to decouple Transaction and
PendingUpdate (#591)
Add TransactionContext to own the shared state (table, metadata_builder,
kind) between Transaction and PendingUpdate. Both now hold a
shared_ptr<TransactionContext> instead of PendingUpdate holding a
weak_ptr<Transaction>.
This fixes two issues:
- pending_updates_ was weak_ptr, so dropping a PendingUpdate would
silently break Finalize/retry; now Transaction holds shared_ptr
- Table::New*() no longer creates a temporary Transaction; it creates a
TransactionContext directly and passes it to the PendingUpdate, removing
the circular dependency
Also clean up related redundancy:
- Hoist Transaction::Kind to a standalone enum class TransactionKind
- Remove Transaction::kind_ (duplicate of ctx_->kind)
- Remove auto_commit machinery; PendingUpdate::Commit() now calls
txn->Commit() explicitly on the table-created path
- TransactionContext::Make returns Result to propagate null table errors
---
src/iceberg/catalog/memory/in_memory_catalog.cc | 3 +-
src/iceberg/catalog/rest/rest_catalog.cc | 3 +-
src/iceberg/table.cc | 53 +++---
src/iceberg/test/update_partition_spec_test.cc | 10 +-
src/iceberg/transaction.cc | 195 +++++++++++++---------
src/iceberg/transaction.h | 57 +++++--
src/iceberg/type_fwd.h | 1 +
src/iceberg/update/expire_snapshots.cc | 13 +-
src/iceberg/update/expire_snapshots.h | 4 +-
src/iceberg/update/fast_append.cc | 18 +-
src/iceberg/update/fast_append.h | 4 +-
src/iceberg/update/pending_update.cc | 29 +++-
src/iceberg/update/pending_update.h | 4 +-
src/iceberg/update/set_snapshot.cc | 13 +-
src/iceberg/update/set_snapshot.h | 4 +-
src/iceberg/update/snapshot_manager.cc | 3 +-
src/iceberg/update/snapshot_update.cc | 36 ++--
src/iceberg/update/snapshot_update.h | 2 +-
src/iceberg/update/update_location.cc | 11 +-
src/iceberg/update/update_location.h | 4 +-
src/iceberg/update/update_partition_spec.cc | 12 +-
src/iceberg/update/update_partition_spec.h | 4 +-
src/iceberg/update/update_partition_statistics.cc | 12 +-
src/iceberg/update/update_partition_statistics.h | 4 +-
src/iceberg/update/update_properties.cc | 11 +-
src/iceberg/update/update_properties.h | 4 +-
src/iceberg/update/update_schema.cc | 13 +-
src/iceberg/update/update_schema.h | 4 +-
src/iceberg/update/update_snapshot_reference.cc | 14 +-
src/iceberg/update/update_snapshot_reference.h | 4 +-
src/iceberg/update/update_sort_order.cc | 11 +-
src/iceberg/update/update_sort_order.h | 4 +-
src/iceberg/update/update_statistics.cc | 11 +-
src/iceberg/update/update_statistics.h | 4 +-
34 files changed, 314 insertions(+), 265 deletions(-)
diff --git a/src/iceberg/catalog/memory/in_memory_catalog.cc
b/src/iceberg/catalog/memory/in_memory_catalog.cc
index 8a082aad..7cd02ac7 100644
--- a/src/iceberg/catalog/memory/in_memory_catalog.cc
+++ b/src/iceberg/catalog/memory/in_memory_catalog.cc
@@ -500,8 +500,7 @@ Result<std::shared_ptr<Transaction>>
InMemoryCatalog::StageCreateTable(
ICEBERG_ASSIGN_OR_RAISE(
auto table, StagedTable::Make(identifier, std::move(table_metadata), "",
file_io_,
shared_from_this()));
- return Transaction::Make(std::move(table), Transaction::Kind::kCreate,
- /* auto_commit */ false);
+ return Transaction::Make(std::move(table), TransactionKind::kCreate);
}
Result<bool> InMemoryCatalog::TableExists(const TableIdentifier& identifier)
const {
diff --git a/src/iceberg/catalog/rest/rest_catalog.cc
b/src/iceberg/catalog/rest/rest_catalog.cc
index 94c6b1e4..42dc8659 100644
--- a/src/iceberg/catalog/rest/rest_catalog.cc
+++ b/src/iceberg/catalog/rest/rest_catalog.cc
@@ -384,8 +384,7 @@ Result<std::shared_ptr<Transaction>>
RestCatalog::StageCreateTable(
StagedTable::Make(identifier,
std::move(result.metadata),
std::move(result.metadata_location), file_io_,
shared_from_this()));
- return Transaction::Make(std::move(staged_table), Transaction::Kind::kCreate,
- /*auto_commit=*/false);
+ return Transaction::Make(std::move(staged_table), TransactionKind::kCreate);
}
Status RestCatalog::DropTable(const TableIdentifier& identifier, bool purge) {
diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc
index 72190855..2f2753f3 100644
--- a/src/iceberg/table.cc
+++ b/src/iceberg/table.cc
@@ -32,11 +32,16 @@
#include "iceberg/table_scan.h"
#include "iceberg/transaction.h"
#include "iceberg/update/expire_snapshots.h"
+#include "iceberg/update/fast_append.h"
+#include "iceberg/update/set_snapshot.h"
#include "iceberg/update/snapshot_manager.h"
+#include "iceberg/update/update_location.h"
#include "iceberg/update/update_partition_spec.h"
#include "iceberg/update/update_partition_statistics.h"
#include "iceberg/update/update_properties.h"
#include "iceberg/update/update_schema.h"
+#include "iceberg/update/update_snapshot_reference.h"
+#include "iceberg/update/update_sort_order.h"
#include "iceberg/update/update_statistics.h"
#include "iceberg/util/macros.h"
@@ -166,71 +171,61 @@ Table::NewIncrementalChangelogScan() const {
Result<std::shared_ptr<Transaction>> Table::NewTransaction() {
// Create a brand new transaction object for the table. Users are expected
to commit the
// transaction manually.
- return Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
- /*auto_commit=*/false);
+ return Transaction::Make(shared_from_this(), TransactionKind::kUpdate);
}
Result<std::shared_ptr<UpdatePartitionSpec>> Table::NewUpdatePartitionSpec() {
ICEBERG_ASSIGN_OR_RAISE(
- auto transaction, Transaction::Make(shared_from_this(),
Transaction::Kind::kUpdate,
- /*auto_commit=*/true));
- return transaction->NewUpdatePartitionSpec();
+ auto ctx, TransactionContext::Make(shared_from_this(),
TransactionKind::kUpdate));
+ return UpdatePartitionSpec::Make(std::move(ctx));
}
Result<std::shared_ptr<UpdateProperties>> Table::NewUpdateProperties() {
ICEBERG_ASSIGN_OR_RAISE(
- auto transaction, Transaction::Make(shared_from_this(),
Transaction::Kind::kUpdate,
- /*auto_commit=*/true));
- return transaction->NewUpdateProperties();
+ auto ctx, TransactionContext::Make(shared_from_this(),
TransactionKind::kUpdate));
+ return UpdateProperties::Make(std::move(ctx));
}
Result<std::shared_ptr<UpdateSortOrder>> Table::NewUpdateSortOrder() {
ICEBERG_ASSIGN_OR_RAISE(
- auto transaction, Transaction::Make(shared_from_this(),
Transaction::Kind::kUpdate,
- /*auto_commit=*/true));
- return transaction->NewUpdateSortOrder();
+ auto ctx, TransactionContext::Make(shared_from_this(),
TransactionKind::kUpdate));
+ return UpdateSortOrder::Make(std::move(ctx));
}
Result<std::shared_ptr<UpdateSchema>> Table::NewUpdateSchema() {
ICEBERG_ASSIGN_OR_RAISE(
- auto transaction, Transaction::Make(shared_from_this(),
Transaction::Kind::kUpdate,
- /*auto_commit=*/true));
- return transaction->NewUpdateSchema();
+ auto ctx, TransactionContext::Make(shared_from_this(),
TransactionKind::kUpdate));
+ return UpdateSchema::Make(std::move(ctx));
}
Result<std::shared_ptr<ExpireSnapshots>> Table::NewExpireSnapshots() {
ICEBERG_ASSIGN_OR_RAISE(
- auto transaction, Transaction::Make(shared_from_this(),
Transaction::Kind::kUpdate,
- /*auto_commit=*/true));
- return transaction->NewExpireSnapshots();
+ auto ctx, TransactionContext::Make(shared_from_this(),
TransactionKind::kUpdate));
+ return ExpireSnapshots::Make(std::move(ctx));
}
Result<std::shared_ptr<UpdateLocation>> Table::NewUpdateLocation() {
ICEBERG_ASSIGN_OR_RAISE(
- auto transaction, Transaction::Make(shared_from_this(),
Transaction::Kind::kUpdate,
- /*auto_commit=*/true));
- return transaction->NewUpdateLocation();
+ auto ctx, TransactionContext::Make(shared_from_this(),
TransactionKind::kUpdate));
+ return UpdateLocation::Make(std::move(ctx));
}
Result<std::shared_ptr<FastAppend>> Table::NewFastAppend() {
ICEBERG_ASSIGN_OR_RAISE(
- auto transaction, Transaction::Make(shared_from_this(),
Transaction::Kind::kUpdate,
- /*auto_commit=*/true));
- return transaction->NewFastAppend();
+ auto ctx, TransactionContext::Make(shared_from_this(),
TransactionKind::kUpdate));
+ return FastAppend::Make(name().name, std::move(ctx));
}
Result<std::shared_ptr<UpdateStatistics>> Table::NewUpdateStatistics() {
ICEBERG_ASSIGN_OR_RAISE(
- auto transaction, Transaction::Make(shared_from_this(),
Transaction::Kind::kUpdate,
- /*auto_commit=*/true));
- return transaction->NewUpdateStatistics();
+ auto ctx, TransactionContext::Make(shared_from_this(),
TransactionKind::kUpdate));
+ return UpdateStatistics::Make(std::move(ctx));
}
Result<std::shared_ptr<UpdatePartitionStatistics>>
Table::NewUpdatePartitionStatistics() {
ICEBERG_ASSIGN_OR_RAISE(
- auto transaction, Transaction::Make(shared_from_this(),
Transaction::Kind::kUpdate,
- /*auto_commit=*/true));
- return transaction->NewUpdatePartitionStatistics();
+ auto ctx, TransactionContext::Make(shared_from_this(),
TransactionKind::kUpdate));
+ return UpdatePartitionStatistics::Make(std::move(ctx));
}
Result<std::shared_ptr<SnapshotManager>> Table::NewSnapshotManager() {
diff --git a/src/iceberg/test/update_partition_spec_test.cc
b/src/iceberg/test/update_partition_spec_test.cc
index fc316aae..632c4a55 100644
--- a/src/iceberg/test/update_partition_spec_test.cc
+++ b/src/iceberg/test/update_partition_spec_test.cc
@@ -232,14 +232,12 @@ class UpdatePartitionSpecTest : public
::testing::TestWithParam<int8_t> {
// Helper to create UpdatePartitionSpec from a table
std::shared_ptr<UpdatePartitionSpec> CreateUpdateFromTable(
std::shared_ptr<Table> table) {
- auto transaction_result =
- Transaction::Make(table, Transaction::Kind::kUpdate,
/*auto_commit=*/false);
- if (!transaction_result.has_value()) {
- ADD_FAILURE() << "Failed to create transaction: "
- << transaction_result.error().message;
+ auto ctx_result = TransactionContext::Make(table,
TransactionKind::kUpdate);
+ if (!ctx_result.has_value()) {
+ ADD_FAILURE() << "Failed to create context: " <<
ctx_result.error().message;
return nullptr;
}
- auto update_result = UpdatePartitionSpec::Make(transaction_result.value());
+ auto update_result =
UpdatePartitionSpec::Make(std::move(ctx_result.value()));
if (!update_result.has_value()) {
ADD_FAILURE() << "Failed to create UpdatePartitionSpec: "
<< update_result.error().message;
diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc
index 58b0daf9..bf4ac426 100644
--- a/src/iceberg/transaction.cc
+++ b/src/iceberg/transaction.cc
@@ -19,6 +19,7 @@
*/
#include "iceberg/transaction.h"
+#include <format>
#include <memory>
#include <optional>
@@ -52,50 +53,85 @@
namespace iceberg {
-Transaction::Transaction(std::shared_ptr<Table> table, Kind kind, bool
auto_commit,
- std::unique_ptr<TableMetadataBuilder>
metadata_builder)
- : table_(std::move(table)),
- kind_(kind),
- auto_commit_(auto_commit),
- metadata_builder_(std::move(metadata_builder)) {}
+// ---------------------------------------------------------------------------
+// TransactionContext
+// ---------------------------------------------------------------------------
+
+TransactionContext::TransactionContext() = default;
+TransactionContext::~TransactionContext() = default;
+
+Result<std::shared_ptr<TransactionContext>> TransactionContext::Make(
+ std::shared_ptr<Table> table, TransactionKind kind) {
+ ICEBERG_PRECHECK(table != nullptr, "Table cannot be null");
+ auto ctx = std::make_shared<TransactionContext>();
+ ctx->kind = kind;
+ ctx->table = std::move(table);
+ if (kind == TransactionKind::kCreate) {
+ ctx->metadata_builder = TableMetadataBuilder::BuildFromEmpty();
+ std::ignore =
ctx->metadata_builder->ApplyChangesForCreate(*ctx->table->metadata());
+ } else {
+ ctx->metadata_builder =
TableMetadataBuilder::BuildFrom(ctx->table->metadata().get());
+ }
+ return ctx;
+}
+
+const TableMetadata* TransactionContext::base() const { return
metadata_builder->base(); }
+
+const TableMetadata& TransactionContext::current() const {
+ return metadata_builder->current();
+}
+
+std::string TransactionContext::MetadataFileLocation(std::string_view
filename) const {
+ const auto metadata_location =
+ current().properties.Get(TableProperties::kWriteMetadataLocation);
+ if (metadata_location.empty()) {
+ return std::format("{}/metadata/{}", current().location, filename);
+ }
+ return std::format("{}/{}",
LocationUtil::StripTrailingSlash(metadata_location),
+ filename);
+}
+
+// ---------------------------------------------------------------------------
+// Transaction
+// ---------------------------------------------------------------------------
+
+Transaction::Transaction(std::shared_ptr<TransactionContext> ctx)
+ : ctx_(std::move(ctx)) {}
Transaction::~Transaction() = default;
Result<std::shared_ptr<Transaction>> Transaction::Make(std::shared_ptr<Table>
table,
- Kind kind, bool
auto_commit) {
+ TransactionKind kind) {
ICEBERG_PRECHECK(table && table->catalog(), "Table and catalog cannot be
null");
+ ICEBERG_ASSIGN_OR_RAISE(auto ctx, TransactionContext::Make(std::move(table),
kind));
+ auto txn = std::shared_ptr<Transaction>(new Transaction(ctx));
+ ctx->transaction = std::weak_ptr<Transaction>(txn);
+ return txn;
+}
- std::unique_ptr<TableMetadataBuilder> metadata_builder;
- if (kind == Kind::kCreate) {
- metadata_builder = TableMetadataBuilder::BuildFromEmpty();
- std::ignore = metadata_builder->ApplyChangesForCreate(*table->metadata());
- } else {
- metadata_builder =
TableMetadataBuilder::BuildFrom(table->metadata().get());
- }
-
- return std::shared_ptr<Transaction>(
- new Transaction(std::move(table), kind, auto_commit,
std::move(metadata_builder)));
+Result<std::shared_ptr<Transaction>> Transaction::Make(
+ std::shared_ptr<TransactionContext> ctx) {
+ ICEBERG_PRECHECK(ctx != nullptr, "TransactionContext cannot be null");
+ auto txn = std::shared_ptr<Transaction>(new Transaction(ctx));
+ ctx->transaction = std::weak_ptr<Transaction>(txn);
+ return txn;
}
-const TableMetadata* Transaction::base() const { return
metadata_builder_->base(); }
+const std::shared_ptr<Table>& Transaction::table() const { return ctx_->table;
}
+
+const TableMetadata* Transaction::base() const { return ctx_->base(); }
-const TableMetadata& Transaction::current() const { return
metadata_builder_->current(); }
+const TableMetadata& Transaction::current() const { return ctx_->current(); }
std::string Transaction::MetadataFileLocation(std::string_view filename) const
{
- const auto metadata_location =
- current().properties.Get(TableProperties::kWriteMetadataLocation);
- if (metadata_location.empty()) {
- return std::format("{}/{}",
LocationUtil::StripTrailingSlash(metadata_location),
- filename);
- }
- return std::format("{}/metadata/{}", current().location, filename);
+ return ctx_->MetadataFileLocation(filename);
}
Status Transaction::AddUpdate(const std::shared_ptr<PendingUpdate>& update) {
ICEBERG_CHECK(last_update_committed_,
"Cannot add update when previous update is not committed");
- pending_updates_.emplace_back(std::weak_ptr<PendingUpdate>(update));
+ pending_updates_.push_back(update);
last_update_committed_ = false;
return {};
}
@@ -153,52 +189,48 @@ Status Transaction::Apply(PendingUpdate& update) {
last_update_committed_ = true;
- if (auto_commit_) {
- ICEBERG_RETURN_UNEXPECTED(Commit());
- }
-
return {};
}
Status Transaction::ApplyExpireSnapshots(ExpireSnapshots& update) {
ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
if (!result.snapshot_ids_to_remove.empty()) {
-
metadata_builder_->RemoveSnapshots(std::move(result.snapshot_ids_to_remove));
+
ctx_->metadata_builder->RemoveSnapshots(std::move(result.snapshot_ids_to_remove));
}
if (!result.refs_to_remove.empty()) {
for (const auto& ref_name : result.refs_to_remove) {
- metadata_builder_->RemoveRef(ref_name);
+ ctx_->metadata_builder->RemoveRef(ref_name);
}
}
if (!result.partition_spec_ids_to_remove.empty()) {
- metadata_builder_->RemovePartitionSpecs(
+ ctx_->metadata_builder->RemovePartitionSpecs(
std::move(result.partition_spec_ids_to_remove));
}
if (!result.schema_ids_to_remove.empty()) {
- metadata_builder_->RemoveSchemas(std::move(result.schema_ids_to_remove));
+
ctx_->metadata_builder->RemoveSchemas(std::move(result.schema_ids_to_remove));
}
return {};
}
Status Transaction::ApplySetSnapshot(SetSnapshot& update) {
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, update.Apply());
- metadata_builder_->SetBranchSnapshot(snapshot_id,
- std::string(SnapshotRef::kMainBranch));
+ ctx_->metadata_builder->SetBranchSnapshot(snapshot_id,
+
std::string(SnapshotRef::kMainBranch));
return {};
}
Status Transaction::ApplyUpdateLocation(UpdateLocation& update) {
ICEBERG_ASSIGN_OR_RAISE(auto location, update.Apply());
- metadata_builder_->SetLocation(location);
+ ctx_->metadata_builder->SetLocation(location);
return {};
}
Status Transaction::ApplyUpdatePartitionSpec(UpdatePartitionSpec& update) {
ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
if (result.set_as_default) {
- metadata_builder_->SetDefaultPartitionSpec(std::move(result.spec));
+ ctx_->metadata_builder->SetDefaultPartitionSpec(std::move(result.spec));
} else {
- metadata_builder_->AddPartitionSpec(std::move(result.spec));
+ ctx_->metadata_builder->AddPartitionSpec(std::move(result.spec));
}
return {};
}
@@ -206,30 +238,30 @@ Status
Transaction::ApplyUpdatePartitionSpec(UpdatePartitionSpec& update) {
Status Transaction::ApplyUpdateProperties(UpdateProperties& update) {
ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
if (!result.updates.empty()) {
- metadata_builder_->SetProperties(std::move(result.updates));
+ ctx_->metadata_builder->SetProperties(std::move(result.updates));
}
if (!result.removals.empty()) {
- metadata_builder_->RemoveProperties(std::move(result.removals));
+ ctx_->metadata_builder->RemoveProperties(std::move(result.removals));
}
if (result.format_version.has_value()) {
- metadata_builder_->UpgradeFormatVersion(result.format_version.value());
+
ctx_->metadata_builder->UpgradeFormatVersion(result.format_version.value());
}
return {};
}
Status Transaction::ApplyUpdateSchema(UpdateSchema& update) {
ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
- metadata_builder_->SetCurrentSchema(std::move(result.schema),
- result.new_last_column_id);
+ ctx_->metadata_builder->SetCurrentSchema(std::move(result.schema),
+ result.new_last_column_id);
if (!result.updated_props.empty()) {
- metadata_builder_->SetProperties(result.updated_props);
+ ctx_->metadata_builder->SetProperties(result.updated_props);
}
return {};
}
Status Transaction::ApplyUpdateSnapshot(SnapshotUpdate& update) {
- const auto& base = metadata_builder_->current();
+ const auto& base = ctx_->metadata_builder->current();
ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
@@ -252,14 +284,14 @@ Status Transaction::ApplyUpdateSnapshot(SnapshotUpdate&
update) {
}
for (const auto& change : temp_update->changes()) {
- change->ApplyTo(*metadata_builder_);
+ change->ApplyTo(*ctx_->metadata_builder);
}
// If the table UUID is missing, add it here. the UUID will be re-created
each time
// this operation retries to ensure that if a concurrent operation assigns
the UUID,
// this operation will not fail.
if (base.table_uuid.empty()) {
- metadata_builder_->AssignUUID();
+ ctx_->metadata_builder->AssignUUID();
}
return {};
}
@@ -267,27 +299,27 @@ Status Transaction::ApplyUpdateSnapshot(SnapshotUpdate&
update) {
Status Transaction::ApplyUpdateSnapshotReference(UpdateSnapshotReference&
update) {
ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
for (const auto& name : result.to_remove) {
- metadata_builder_->RemoveRef(name);
+ ctx_->metadata_builder->RemoveRef(name);
}
for (auto&& [name, ref] : result.to_set) {
- metadata_builder_->SetRef(std::move(name), std::move(ref));
+ ctx_->metadata_builder->SetRef(std::move(name), std::move(ref));
}
return {};
}
Status Transaction::ApplyUpdateSortOrder(UpdateSortOrder& update) {
ICEBERG_ASSIGN_OR_RAISE(auto sort_order, update.Apply());
- metadata_builder_->SetDefaultSortOrder(std::move(sort_order));
+ ctx_->metadata_builder->SetDefaultSortOrder(std::move(sort_order));
return {};
}
Status Transaction::ApplyUpdateStatistics(UpdateStatistics& update) {
ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
for (auto&& [_, stat_file] : result.to_set) {
- metadata_builder_->SetStatistics(std::move(stat_file));
+ ctx_->metadata_builder->SetStatistics(std::move(stat_file));
}
for (const auto& snapshot_id : result.to_remove) {
- metadata_builder_->RemoveStatistics(snapshot_id);
+ ctx_->metadata_builder->RemoveStatistics(snapshot_id);
}
return {};
}
@@ -295,10 +327,10 @@ Status
Transaction::ApplyUpdateStatistics(UpdateStatistics& update) {
Status Transaction::ApplyUpdatePartitionStatistics(UpdatePartitionStatistics&
update) {
ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
for (auto&& [_, partition_stat_file] : result.to_set) {
- metadata_builder_->SetPartitionStatistics(std::move(partition_stat_file));
+
ctx_->metadata_builder->SetPartitionStatistics(std::move(partition_stat_file));
}
for (const auto& snapshot_id : result.to_remove) {
- metadata_builder_->RemovePartitionStatistics(snapshot_id);
+ ctx_->metadata_builder->RemovePartitionStatistics(snapshot_id);
}
return {};
}
@@ -308,104 +340,103 @@ Result<std::shared_ptr<Table>> Transaction::Commit() {
ICEBERG_CHECK(last_update_committed_,
"Cannot commit transaction when previous update is not
committed");
- const auto& updates = metadata_builder_->changes();
+ const auto& updates = ctx_->metadata_builder->changes();
if (updates.empty()) {
committed_ = true;
- return table_;
+ return ctx_->table;
}
std::vector<std::unique_ptr<TableRequirement>> requirements;
- switch (kind_) {
- case Kind::kCreate: {
+ switch (ctx_->kind) {
+ case TransactionKind::kCreate: {
ICEBERG_ASSIGN_OR_RAISE(requirements,
TableRequirements::ForCreateTable(updates));
} break;
- case Kind::kUpdate: {
- ICEBERG_ASSIGN_OR_RAISE(requirements, TableRequirements::ForUpdateTable(
- *metadata_builder_->base(),
updates));
+ case TransactionKind::kUpdate: {
+ ICEBERG_ASSIGN_OR_RAISE(
+ requirements,
+ TableRequirements::ForUpdateTable(*ctx_->metadata_builder->base(),
updates));
} break;
}
// XXX: we should handle commit failure and retry here.
auto commit_result =
- table_->catalog()->UpdateTable(table_->name(), requirements, updates);
+ ctx_->table->catalog()->UpdateTable(ctx_->table->name(), requirements,
updates);
for (const auto& update : pending_updates_) {
- if (auto update_ptr = update.lock()) {
- std::ignore = update_ptr->Finalize(commit_result.has_value()
- ? std::nullopt
- :
std::make_optional(commit_result.error()));
- }
+ std::ignore = update->Finalize(commit_result.has_value()
+ ? std::nullopt
+ :
std::make_optional(commit_result.error()));
}
ICEBERG_RETURN_UNEXPECTED(commit_result);
// Mark as committed and update table reference
committed_ = true;
- table_ = std::move(commit_result.value());
+ ctx_->table = std::move(commit_result.value());
- return table_;
+ return ctx_->table;
}
Result<std::shared_ptr<UpdatePartitionSpec>>
Transaction::NewUpdatePartitionSpec() {
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<UpdatePartitionSpec> update_spec,
- UpdatePartitionSpec::Make(shared_from_this()));
+ UpdatePartitionSpec::Make(ctx_));
ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_spec));
return update_spec;
}
Result<std::shared_ptr<UpdateProperties>> Transaction::NewUpdateProperties() {
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<UpdateProperties> update_properties,
- UpdateProperties::Make(shared_from_this()));
+ UpdateProperties::Make(ctx_));
ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_properties));
return update_properties;
}
Result<std::shared_ptr<UpdateSortOrder>> Transaction::NewUpdateSortOrder() {
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<UpdateSortOrder> update_sort_order,
- UpdateSortOrder::Make(shared_from_this()));
+ UpdateSortOrder::Make(ctx_));
ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_sort_order));
return update_sort_order;
}
Result<std::shared_ptr<UpdateSchema>> Transaction::NewUpdateSchema() {
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<UpdateSchema> update_schema,
- UpdateSchema::Make(shared_from_this()));
+ UpdateSchema::Make(ctx_));
ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_schema));
return update_schema;
}
Result<std::shared_ptr<ExpireSnapshots>> Transaction::NewExpireSnapshots() {
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<ExpireSnapshots> expire_snapshots,
- ExpireSnapshots::Make(shared_from_this()));
+ ExpireSnapshots::Make(ctx_));
ICEBERG_RETURN_UNEXPECTED(AddUpdate(expire_snapshots));
return expire_snapshots;
}
Result<std::shared_ptr<UpdateLocation>> Transaction::NewUpdateLocation() {
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<UpdateLocation> update_location,
- UpdateLocation::Make(shared_from_this()));
+ UpdateLocation::Make(ctx_));
ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_location));
return update_location;
}
Result<std::shared_ptr<SetSnapshot>> Transaction::NewSetSnapshot() {
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<SetSnapshot> set_snapshot,
- SetSnapshot::Make(shared_from_this()));
+ SetSnapshot::Make(ctx_));
ICEBERG_RETURN_UNEXPECTED(AddUpdate(set_snapshot));
return set_snapshot;
}
Result<std::shared_ptr<FastAppend>> Transaction::NewFastAppend() {
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<FastAppend> fast_append,
- FastAppend::Make(table_->name().name,
shared_from_this()));
+ FastAppend::Make(ctx_->table->name().name, ctx_));
ICEBERG_RETURN_UNEXPECTED(AddUpdate(fast_append));
return fast_append;
}
Result<std::shared_ptr<UpdateStatistics>> Transaction::NewUpdateStatistics() {
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<UpdateStatistics> update_statistics,
- UpdateStatistics::Make(shared_from_this()));
+ UpdateStatistics::Make(ctx_));
ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_statistics));
return update_statistics;
}
@@ -414,7 +445,7 @@ Result<std::shared_ptr<UpdatePartitionStatistics>>
Transaction::NewUpdatePartitionStatistics() {
ICEBERG_ASSIGN_OR_RAISE(
std::shared_ptr<UpdatePartitionStatistics> update_partition_statistics,
- UpdatePartitionStatistics::Make(shared_from_this()));
+ UpdatePartitionStatistics::Make(ctx_));
ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_partition_statistics));
return update_partition_statistics;
}
@@ -422,7 +453,7 @@ Transaction::NewUpdatePartitionStatistics() {
Result<std::shared_ptr<UpdateSnapshotReference>>
Transaction::NewUpdateSnapshotReference() {
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<UpdateSnapshotReference> update_ref,
- UpdateSnapshotReference::Make(shared_from_this()));
+ UpdateSnapshotReference::Make(ctx_));
ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_ref));
return update_ref;
}
diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h
index 438054b5..ec8c4db0 100644
--- a/src/iceberg/transaction.h
+++ b/src/iceberg/transaction.h
@@ -20,7 +20,11 @@
#pragma once
+#include <cstdint>
#include <memory>
+#include <optional>
+#include <string>
+#include <string_view>
#include <vector>
#include "iceberg/iceberg_export.h"
@@ -29,19 +33,24 @@
namespace iceberg {
+/// \brief Whether a transaction creates a new table or updates an existing
one.
+enum class TransactionKind : uint8_t { kCreate, kUpdate };
+
/// \brief A transaction for performing multiple updates to a table
class ICEBERG_EXPORT Transaction : public
std::enable_shared_from_this<Transaction> {
public:
- enum class Kind : uint8_t { kCreate, kUpdate };
-
~Transaction();
/// \brief Create a new transaction
static Result<std::shared_ptr<Transaction>> Make(std::shared_ptr<Table>
table,
- Kind kind, bool
auto_commit);
+ TransactionKind kind);
+
+ /// \brief Create a transaction from an existing context (used by
PendingUpdate::Commit)
+ static Result<std::shared_ptr<Transaction>> Make(
+ std::shared_ptr<TransactionContext> ctx);
/// \brief Return the Table that this transaction will update
- const std::shared_ptr<Table>& table() const { return table_; }
+ const std::shared_ptr<Table>& table() const;
/// \brief Returns the base metadata without any changes
const TableMetadata* base() const;
@@ -109,8 +118,7 @@ class ICEBERG_EXPORT Transaction : public
std::enable_shared_from_this<Transacti
Result<std::shared_ptr<UpdateSnapshotReference>>
NewUpdateSnapshotReference();
private:
- Transaction(std::shared_ptr<Table> table, Kind kind, bool auto_commit,
- std::unique_ptr<TableMetadataBuilder> metadata_builder);
+ explicit Transaction(std::shared_ptr<TransactionContext> ctx);
Status AddUpdate(const std::shared_ptr<PendingUpdate>& update);
@@ -133,22 +141,35 @@ class ICEBERG_EXPORT Transaction : public
std::enable_shared_from_this<Transacti
private:
friend class PendingUpdate;
- // The table that this transaction will update.
- std::shared_ptr<Table> table_;
- // The kind of this transaction.
- const Kind kind_;
- // Whether to auto-commit the transaction when updates are applied.
- // This is useful when a temporary transaction is created for a single
operation.
- bool auto_commit_;
+ // Shared context owning the table, metadata builder, and kind.
+ std::shared_ptr<TransactionContext> ctx_;
+ // Keep track of all created pending updates.
+ std::vector<std::shared_ptr<PendingUpdate>> pending_updates_;
// To make the state simple, we require updates are added and committed in
order.
bool last_update_committed_ = true;
// Tracks if transaction has been committed to prevent double-commit
bool committed_ = false;
- // Keep track of all created pending updates. Use weak_ptr to avoid circular
references.
- // This is useful to retry failed updates.
- std::vector<std::weak_ptr<PendingUpdate>> pending_updates_;
- // Accumulated updates from all pending updates.
- std::unique_ptr<TableMetadataBuilder> metadata_builder_;
+};
+
+/// \brief Shared context between Transaction and PendingUpdate instances.
+class ICEBERG_EXPORT TransactionContext {
+ public:
+ TransactionContext();
+ ~TransactionContext();
+
+ static Result<std::shared_ptr<TransactionContext>>
Make(std::shared_ptr<Table> table,
+ TransactionKind
kind);
+
+ const TableMetadata* base() const;
+ const TableMetadata& current() const;
+ std::string MetadataFileLocation(std::string_view filename) const;
+
+ std::shared_ptr<Table> table;
+ std::unique_ptr<TableMetadataBuilder> metadata_builder;
+ TransactionKind kind;
+ // If PendingUpdate is created directly from Table, this is nullopt;
+ // otherwise, it holds a weak pointer to the Transaction that created it.
+ std::optional<std::weak_ptr<Transaction>> transaction;
};
} // namespace iceberg
diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h
index cad3e969..0b0cccbf 100644
--- a/src/iceberg/type_fwd.h
+++ b/src/iceberg/type_fwd.h
@@ -203,6 +203,7 @@ class TableUpdate;
class TableRequirement;
class TableUpdateContext;
class Transaction;
+class TransactionContext;
/// \brief Update family.
class ExpireSnapshots;
diff --git a/src/iceberg/update/expire_snapshots.cc
b/src/iceberg/update/expire_snapshots.cc
index 68cf08ca..722ae7a4 100644
--- a/src/iceberg/update/expire_snapshots.cc
+++ b/src/iceberg/update/expire_snapshots.cc
@@ -38,14 +38,13 @@
namespace iceberg {
Result<std::shared_ptr<ExpireSnapshots>> ExpireSnapshots::Make(
- std::shared_ptr<Transaction> transaction) {
- ICEBERG_PRECHECK(transaction != nullptr,
- "Cannot create ExpireSnapshots without a transaction");
- return std::shared_ptr<ExpireSnapshots>(new
ExpireSnapshots(std::move(transaction)));
+ std::shared_ptr<TransactionContext> ctx) {
+ ICEBERG_PRECHECK(ctx != nullptr, "Cannot create ExpireSnapshots without a
context");
+ return std::shared_ptr<ExpireSnapshots>(new ExpireSnapshots(std::move(ctx)));
}
-ExpireSnapshots::ExpireSnapshots(std::shared_ptr<Transaction> transaction)
- : PendingUpdate(std::move(transaction)),
+ExpireSnapshots::ExpireSnapshots(std::shared_ptr<TransactionContext> ctx)
+ : PendingUpdate(std::move(ctx)),
current_time_ms_(CurrentTimePointMs()),
default_max_ref_age_ms_(base().properties.Get(TableProperties::kMaxRefAgeMs)),
default_min_num_snapshots_(
@@ -263,7 +262,7 @@ Result<ExpireSnapshots::ApplyResult>
ExpireSnapshots::Apply() {
ICEBERG_ASSIGN_OR_RAISE(auto snapshot, base.SnapshotById(snapshot_id));
SnapshotCache snapshot_cache(snapshot.get());
ICEBERG_ASSIGN_OR_RAISE(auto manifests,
-
snapshot_cache.Manifests(transaction_->table()->io()));
+ snapshot_cache.Manifests(ctx_->table->io()));
for (const auto& manifest : manifests) {
reachable_specs.insert(manifest.partition_spec_id);
}
diff --git a/src/iceberg/update/expire_snapshots.h
b/src/iceberg/update/expire_snapshots.h
index 17a4d8b0..bc05d810 100644
--- a/src/iceberg/update/expire_snapshots.h
+++ b/src/iceberg/update/expire_snapshots.h
@@ -64,7 +64,7 @@ enum class CleanupLevel : uint8_t {
class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate {
public:
static Result<std::shared_ptr<ExpireSnapshots>> Make(
- std::shared_ptr<Transaction> transaction);
+ std::shared_ptr<TransactionContext> ctx);
~ExpireSnapshots() override;
@@ -143,7 +143,7 @@ class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate
{
Result<ApplyResult> Apply();
private:
- explicit ExpireSnapshots(std::shared_ptr<Transaction> transaction);
+ explicit ExpireSnapshots(std::shared_ptr<TransactionContext> ctx);
using SnapshotToRef = std::unordered_map<std::string,
std::shared_ptr<SnapshotRef>>;
diff --git a/src/iceberg/update/fast_append.cc
b/src/iceberg/update/fast_append.cc
index 3c132a40..d08f497c 100644
--- a/src/iceberg/update/fast_append.cc
+++ b/src/iceberg/update/fast_append.cc
@@ -36,16 +36,15 @@
namespace iceberg {
Result<std::unique_ptr<FastAppend>> FastAppend::Make(
- std::string table_name, std::shared_ptr<Transaction> transaction) {
+ std::string table_name, std::shared_ptr<TransactionContext> ctx) {
ICEBERG_PRECHECK(!table_name.empty(), "Table name cannot be empty");
- ICEBERG_PRECHECK(transaction != nullptr,
- "Cannot create FastAppend without a transaction");
+ ICEBERG_PRECHECK(ctx != nullptr, "Cannot create FastAppend without a
context");
return std::unique_ptr<FastAppend>(
- new FastAppend(std::move(table_name), std::move(transaction)));
+ new FastAppend(std::move(table_name), std::move(ctx)));
}
-FastAppend::FastAppend(std::string table_name, std::shared_ptr<Transaction>
transaction)
- : SnapshotUpdate(std::move(transaction)),
table_name_(std::move(table_name)) {}
+FastAppend::FastAppend(std::string table_name,
std::shared_ptr<TransactionContext> ctx)
+ : SnapshotUpdate(std::move(ctx)), table_name_(std::move(table_name)) {}
FastAppend& FastAppend::AppendFile(const std::shared_ptr<DataFile>& file) {
ICEBERG_BUILDER_CHECK(file != nullptr, "Invalid data file: null");
@@ -118,7 +117,7 @@ Result<std::vector<ManifestFile>> FastAppend::Apply(
if (snapshot != nullptr) {
auto cached_snapshot = SnapshotCache(snapshot.get());
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_manifests,
-
cached_snapshot.Manifests(transaction_->table()->io()));
+ cached_snapshot.Manifests(ctx_->table->io()));
manifests.insert(manifests.end(), snapshot_manifests.begin(),
snapshot_manifests.end());
}
@@ -179,9 +178,8 @@ Result<ManifestFile> FastAppend::CopyManifest(const
ManifestFile& manifest) {
int64_t snapshot_id = SnapshotId();
// Copy the manifest with the new snapshot ID.
- return CopyAppendManifest(manifest, transaction_->table()->io(), schema,
spec,
- snapshot_id, new_manifest_path,
current.format_version,
- &summary_);
+ return CopyAppendManifest(manifest, ctx_->table->io(), schema, spec,
snapshot_id,
+ new_manifest_path, current.format_version,
&summary_);
}
Result<std::vector<ManifestFile>> FastAppend::WriteNewManifests() {
diff --git a/src/iceberg/update/fast_append.h b/src/iceberg/update/fast_append.h
index 7f5cbb09..580fa472 100644
--- a/src/iceberg/update/fast_append.h
+++ b/src/iceberg/update/fast_append.h
@@ -47,7 +47,7 @@ class ICEBERG_EXPORT FastAppend : public SnapshotUpdate {
/// \param transaction The transaction to use for this update
/// \return A Result containing the FastAppend instance or an error
static Result<std::unique_ptr<FastAppend>> Make(
- std::string table_name, std::shared_ptr<Transaction> transaction);
+ std::string table_name, std::shared_ptr<TransactionContext> ctx);
/// \brief Append a data file to this update.
///
@@ -76,7 +76,7 @@ class ICEBERG_EXPORT FastAppend : public SnapshotUpdate {
bool CleanupAfterCommit() const override;
private:
- explicit FastAppend(std::string table_name, std::shared_ptr<Transaction>
transaction);
+ explicit FastAppend(std::string table_name,
std::shared_ptr<TransactionContext> ctx);
/// \brief Get the partition spec by spec ID.
Result<std::shared_ptr<PartitionSpec>> Spec(int32_t spec_id);
diff --git a/src/iceberg/update/pending_update.cc
b/src/iceberg/update/pending_update.cc
index e55a93df..2d5247d2 100644
--- a/src/iceberg/update/pending_update.cc
+++ b/src/iceberg/update/pending_update.cc
@@ -20,20 +20,41 @@
#include "iceberg/update/pending_update.h"
#include "iceberg/transaction.h"
+#include "iceberg/util/macros.h"
namespace iceberg {
-PendingUpdate::PendingUpdate(std::shared_ptr<Transaction> transaction)
- : transaction_(std::move(transaction)) {}
+PendingUpdate::PendingUpdate(std::shared_ptr<TransactionContext> ctx)
+ : ctx_(std::move(ctx)) {}
PendingUpdate::~PendingUpdate() = default;
-Status PendingUpdate::Commit() { return transaction_->Apply(*this); }
+Status PendingUpdate::Commit() {
+ if (!ctx_->transaction) {
+ // Table-created path: no transaction exists yet, create a temporary one.
+ ICEBERG_ASSIGN_OR_RAISE(auto txn, Transaction::Make(ctx_));
+ Status status = txn->Apply(*this);
+ if (status.has_value()) {
+ auto commit_result = txn->Commit();
+ if (!commit_result.has_value()) {
+ status = std::unexpected(commit_result.error());
+ }
+ }
+ std::ignore =
+ Finalize(status.has_value() ? std::nullopt :
std::make_optional(status.error()));
+ return status;
+ }
+ auto txn = ctx_->transaction->lock();
+ if (!txn) {
+ return CommitFailed("Transaction has been destroyed");
+ }
+ return txn->Apply(*this);
+}
Status PendingUpdate::Finalize([[maybe_unused]] std::optional<Error>
commit_error) {
return {};
}
-const TableMetadata& PendingUpdate::base() const { return
transaction_->current(); }
+const TableMetadata& PendingUpdate::base() const { return ctx_->current(); }
} // namespace iceberg
diff --git a/src/iceberg/update/pending_update.h
b/src/iceberg/update/pending_update.h
index f44812a8..f67be18c 100644
--- a/src/iceberg/update/pending_update.h
+++ b/src/iceberg/update/pending_update.h
@@ -84,11 +84,11 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
~PendingUpdate() override;
protected:
- explicit PendingUpdate(std::shared_ptr<Transaction> transaction);
+ explicit PendingUpdate(std::shared_ptr<TransactionContext> ctx);
const TableMetadata& base() const;
- std::shared_ptr<Transaction> transaction_;
+ std::shared_ptr<TransactionContext> ctx_;
};
} // namespace iceberg
diff --git a/src/iceberg/update/set_snapshot.cc
b/src/iceberg/update/set_snapshot.cc
index 7258bc4d..79662890 100644
--- a/src/iceberg/update/set_snapshot.cc
+++ b/src/iceberg/update/set_snapshot.cc
@@ -34,14 +34,13 @@
namespace iceberg {
Result<std::shared_ptr<SetSnapshot>> SetSnapshot::Make(
- std::shared_ptr<Transaction> transaction) {
- ICEBERG_PRECHECK(transaction != nullptr,
- "Cannot create SetSnapshot without a transaction");
- return std::shared_ptr<SetSnapshot>(new SetSnapshot(std::move(transaction)));
+ std::shared_ptr<TransactionContext> ctx) {
+ ICEBERG_PRECHECK(ctx != nullptr, "Cannot create SetSnapshot without a
context");
+ return std::shared_ptr<SetSnapshot>(new SetSnapshot(std::move(ctx)));
}
-SetSnapshot::SetSnapshot(std::shared_ptr<Transaction> transaction)
- : PendingUpdate(std::move(transaction)) {}
+SetSnapshot::SetSnapshot(std::shared_ptr<TransactionContext> ctx)
+ : PendingUpdate(std::move(ctx)) {}
SetSnapshot::~SetSnapshot() = default;
@@ -89,7 +88,7 @@ SetSnapshot& SetSnapshot::RollbackTo(int64_t snapshot_id) {
Result<int64_t> SetSnapshot::Apply() {
ICEBERG_RETURN_UNEXPECTED(CheckErrors());
- const TableMetadata& base_metadata = transaction_->current();
+ const TableMetadata& base_metadata = ctx_->current();
// If no target snapshot was configured, return current state (NOOP)
if (!target_snapshot_id_.has_value()) {
diff --git a/src/iceberg/update/set_snapshot.h
b/src/iceberg/update/set_snapshot.h
index 1ad39960..6aeb9265 100644
--- a/src/iceberg/update/set_snapshot.h
+++ b/src/iceberg/update/set_snapshot.h
@@ -37,7 +37,7 @@ namespace iceberg {
class ICEBERG_EXPORT SetSnapshot : public PendingUpdate {
public:
static Result<std::shared_ptr<SetSnapshot>> Make(
- std::shared_ptr<Transaction> transaction);
+ std::shared_ptr<TransactionContext> ctx);
~SetSnapshot() override;
@@ -56,7 +56,7 @@ class ICEBERG_EXPORT SetSnapshot : public PendingUpdate {
Result<int64_t> Apply();
private:
- explicit SetSnapshot(std::shared_ptr<Transaction> transaction);
+ explicit SetSnapshot(std::shared_ptr<TransactionContext> ctx);
/// \brief Find the latest snapshot whose timestamp is before the provided
timestamp.
///
diff --git a/src/iceberg/update/snapshot_manager.cc
b/src/iceberg/update/snapshot_manager.cc
index d882dd32..55f73072 100644
--- a/src/iceberg/update/snapshot_manager.cc
+++ b/src/iceberg/update/snapshot_manager.cc
@@ -34,8 +34,7 @@ Result<std::shared_ptr<SnapshotManager>>
SnapshotManager::Make(
std::shared_ptr<Table> table) {
ICEBERG_PRECHECK(table != nullptr, "Invalid input table: null");
ICEBERG_ASSIGN_OR_RAISE(auto transaction,
- Transaction::Make(std::move(table),
Transaction::Kind::kUpdate,
- /*auto_commit=*/false));
+ Transaction::Make(std::move(table),
TransactionKind::kUpdate));
return std::shared_ptr<SnapshotManager>(
new SnapshotManager(std::move(transaction),
/*is_external_transaction=*/false));
}
diff --git a/src/iceberg/update/snapshot_update.cc
b/src/iceberg/update/snapshot_update.cc
index b4468256..3e579266 100644
--- a/src/iceberg/update/snapshot_update.cc
+++ b/src/iceberg/update/snapshot_update.cc
@@ -154,8 +154,8 @@ Result<ManifestFile> AddMetadata(const ManifestFile&
manifest, std::shared_ptr<F
SnapshotUpdate::~SnapshotUpdate() = default;
-SnapshotUpdate::SnapshotUpdate(std::shared_ptr<Transaction> transaction)
- : PendingUpdate(std::move(transaction)),
+SnapshotUpdate::SnapshotUpdate(std::shared_ptr<TransactionContext> ctx)
+ : PendingUpdate(std::move(ctx)),
can_inherit_snapshot_id_(
base().format_version > 1 ||
base().properties.Get(TableProperties::kSnapshotIdInheritanceEnabled)),
@@ -176,11 +176,10 @@ Result<std::vector<ManifestFile>>
SnapshotUpdate::WriteDataManifests(
RollingManifestWriter rolling_writer(
[this, spec, schema = std::move(current_schema),
snapshot_id = SnapshotId()]() ->
Result<std::unique_ptr<ManifestWriter>> {
- return ManifestWriter::MakeWriter(base().format_version, snapshot_id,
- ManifestPath(),
transaction_->table()->io(),
- std::move(spec), std::move(schema),
- ManifestContent::kData,
- /*first_row_id=*/base().next_row_id);
+ return ManifestWriter::MakeWriter(
+ base().format_version, snapshot_id, ManifestPath(),
ctx_->table->io(),
+ std::move(spec), std::move(schema), ManifestContent::kData,
+ /*first_row_id=*/base().next_row_id);
},
target_manifest_size_bytes_);
@@ -203,10 +202,9 @@ Result<std::vector<ManifestFile>>
SnapshotUpdate::WriteDeleteManifests(
RollingManifestWriter rolling_writer(
[this, spec, schema = std::move(current_schema),
snapshot_id = SnapshotId()]() ->
Result<std::unique_ptr<ManifestWriter>> {
- return ManifestWriter::MakeWriter(base().format_version, snapshot_id,
- ManifestPath(),
transaction_->table()->io(),
- std::move(spec), std::move(schema),
- ManifestContent::kDeletes);
+ return ManifestWriter::MakeWriter(
+ base().format_version, snapshot_id, ManifestPath(),
ctx_->table->io(),
+ std::move(spec), std::move(schema), ManifestContent::kDeletes);
},
target_manifest_size_bytes_);
@@ -245,8 +243,7 @@ Result<SnapshotUpdate::ApplyResult> SnapshotUpdate::Apply()
{
continue;
}
// TODO(xxx): read in parallel and cache enriched manifests for retries
- ICEBERG_ASSIGN_OR_RAISE(manifest,
- AddMetadata(manifest, transaction_->table()->io(),
base()));
+ ICEBERG_ASSIGN_OR_RAISE(manifest, AddMetadata(manifest, ctx_->table->io(),
base()));
}
std::string manifest_list_path = ManifestListPath();
@@ -254,8 +251,8 @@ Result<SnapshotUpdate::ApplyResult> SnapshotUpdate::Apply()
{
ICEBERG_ASSIGN_OR_RAISE(
auto writer, ManifestListWriter::MakeWriter(base().format_version,
SnapshotId(),
parent_snapshot_id,
manifest_list_path,
- transaction_->table()->io(),
- sequence_number,
base().next_row_id));
+ ctx_->table->io(),
sequence_number,
+ base().next_row_id));
ICEBERG_RETURN_UNEXPECTED(writer->AddAll(manifests));
ICEBERG_RETURN_UNEXPECTED(writer->Close());
@@ -313,8 +310,7 @@ Status SnapshotUpdate::Finalize(std::optional<Error>
commit_error) {
ICEBERG_CHECK(staged_snapshot_ != nullptr,
"Staged snapshot is null during finalize after commit");
auto cached_snapshot = SnapshotCache(staged_snapshot_.get());
- ICEBERG_ASSIGN_OR_RAISE(auto manifests,
-
cached_snapshot.Manifests(transaction_->table()->io()));
+ ICEBERG_ASSIGN_OR_RAISE(auto manifests,
cached_snapshot.Manifests(ctx_->table->io()));
CleanUncommitted(manifests | std::views::transform([](const auto&
manifest) {
return manifest.manifest_path;
}) |
@@ -391,7 +387,7 @@ void SnapshotUpdate::CleanAll() {
Status SnapshotUpdate::DeleteFile(const std::string& path) {
static const auto kDefaultDeleteFunc = [this](const std::string& path) {
- return this->transaction_->table()->io()->DeleteFile(path);
+ return this->ctx_->table->io()->DeleteFile(path);
};
if (delete_func_) {
return delete_func_(path);
@@ -406,14 +402,14 @@ std::string SnapshotUpdate::ManifestListPath() {
int64_t snapshot_id = SnapshotId();
std::string filename =
std::format("snap-{}-{}-{}.avro", snapshot_id, ++attempt_, commit_uuid_);
- return transaction_->MetadataFileLocation(filename);
+ return ctx_->MetadataFileLocation(filename);
}
std::string SnapshotUpdate::ManifestPath() {
// Generate manifest path
// Format: {metadata_location}/{uuid}-m{manifest_count}.avro
std::string filename = std::format("{}-m{}.avro", commit_uuid_,
manifest_count_++);
- return transaction_->MetadataFileLocation(filename);
+ return ctx_->MetadataFileLocation(filename);
}
} // namespace iceberg
diff --git a/src/iceberg/update/snapshot_update.h
b/src/iceberg/update/snapshot_update.h
index fdbb2660..284d1d2d 100644
--- a/src/iceberg/update/snapshot_update.h
+++ b/src/iceberg/update/snapshot_update.h
@@ -121,7 +121,7 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate {
Status Finalize(std::optional<Error> commit_error) override;
protected:
- explicit SnapshotUpdate(std::shared_ptr<Transaction> transaction);
+ explicit SnapshotUpdate(std::shared_ptr<TransactionContext> ctx);
/// \brief Write data manifests for the given data files
///
diff --git a/src/iceberg/update/update_location.cc
b/src/iceberg/update/update_location.cc
index c82a138f..064bebc8 100644
--- a/src/iceberg/update/update_location.cc
+++ b/src/iceberg/update/update_location.cc
@@ -30,14 +30,13 @@
namespace iceberg {
Result<std::shared_ptr<UpdateLocation>> UpdateLocation::Make(
- std::shared_ptr<Transaction> transaction) {
- ICEBERG_PRECHECK(transaction != nullptr,
- "Cannot create UpdateLocation without a transaction");
- return std::shared_ptr<UpdateLocation>(new
UpdateLocation(std::move(transaction)));
+ std::shared_ptr<TransactionContext> ctx) {
+ ICEBERG_PRECHECK(ctx != nullptr, "Cannot create UpdateLocation without a
context");
+ return std::shared_ptr<UpdateLocation>(new UpdateLocation(std::move(ctx)));
}
-UpdateLocation::UpdateLocation(std::shared_ptr<Transaction> transaction)
- : PendingUpdate(std::move(transaction)) {}
+UpdateLocation::UpdateLocation(std::shared_ptr<TransactionContext> ctx)
+ : PendingUpdate(std::move(ctx)) {}
UpdateLocation::~UpdateLocation() = default;
diff --git a/src/iceberg/update/update_location.h
b/src/iceberg/update/update_location.h
index 891853e9..33864380 100644
--- a/src/iceberg/update/update_location.h
+++ b/src/iceberg/update/update_location.h
@@ -34,7 +34,7 @@ namespace iceberg {
class ICEBERG_EXPORT UpdateLocation : public PendingUpdate {
public:
static Result<std::shared_ptr<UpdateLocation>> Make(
- std::shared_ptr<Transaction> transaction);
+ std::shared_ptr<TransactionContext> ctx);
~UpdateLocation() override;
@@ -50,7 +50,7 @@ class ICEBERG_EXPORT UpdateLocation : public PendingUpdate {
Result<std::string> Apply();
private:
- explicit UpdateLocation(std::shared_ptr<Transaction> transaction);
+ explicit UpdateLocation(std::shared_ptr<TransactionContext> ctx);
std::string location_;
};
diff --git a/src/iceberg/update/update_partition_spec.cc
b/src/iceberg/update/update_partition_spec.cc
index 812540f2..2be6a59a 100644
--- a/src/iceberg/update/update_partition_spec.cc
+++ b/src/iceberg/update/update_partition_spec.cc
@@ -37,15 +37,13 @@
namespace iceberg {
Result<std::shared_ptr<UpdatePartitionSpec>> UpdatePartitionSpec::Make(
- std::shared_ptr<Transaction> transaction) {
- ICEBERG_PRECHECK(transaction != nullptr,
- "Cannot create UpdatePartitionSpec without transaction");
- return std::shared_ptr<UpdatePartitionSpec>(
- new UpdatePartitionSpec(std::move(transaction)));
+ std::shared_ptr<TransactionContext> ctx) {
+ ICEBERG_PRECHECK(ctx != nullptr, "Cannot create UpdatePartitionSpec without
context");
+ return std::shared_ptr<UpdatePartitionSpec>(new
UpdatePartitionSpec(std::move(ctx)));
}
-UpdatePartitionSpec::UpdatePartitionSpec(std::shared_ptr<Transaction>
transaction)
- : PendingUpdate(std::move(transaction)) {
+UpdatePartitionSpec::UpdatePartitionSpec(std::shared_ptr<TransactionContext>
ctx)
+ : PendingUpdate(std::move(ctx)) {
format_version_ = base().format_version;
// Get the current/default partition spec
diff --git a/src/iceberg/update/update_partition_spec.h
b/src/iceberg/update/update_partition_spec.h
index 1eab425d..6b3dd40e 100644
--- a/src/iceberg/update/update_partition_spec.h
+++ b/src/iceberg/update/update_partition_spec.h
@@ -44,7 +44,7 @@ namespace iceberg {
class ICEBERG_EXPORT UpdatePartitionSpec : public PendingUpdate {
public:
static Result<std::shared_ptr<UpdatePartitionSpec>> Make(
- std::shared_ptr<Transaction> transaction);
+ std::shared_ptr<TransactionContext> ctx);
~UpdatePartitionSpec() override;
@@ -107,7 +107,7 @@ class ICEBERG_EXPORT UpdatePartitionSpec : public
PendingUpdate {
Result<ApplyResult> Apply();
private:
- explicit UpdatePartitionSpec(std::shared_ptr<Transaction> transaction);
+ explicit UpdatePartitionSpec(std::shared_ptr<TransactionContext> ctx);
/// \brief Pair of source ID and transform string for indexing.
using TransformKey = std::pair<int32_t, std::string>;
diff --git a/src/iceberg/update/update_partition_statistics.cc
b/src/iceberg/update/update_partition_statistics.cc
index 2c06c0ce..3a5ab4f8 100644
--- a/src/iceberg/update/update_partition_statistics.cc
+++ b/src/iceberg/update/update_partition_statistics.cc
@@ -32,16 +32,16 @@
namespace iceberg {
Result<std::shared_ptr<UpdatePartitionStatistics>>
UpdatePartitionStatistics::Make(
- std::shared_ptr<Transaction> transaction) {
- ICEBERG_PRECHECK(transaction != nullptr,
- "Cannot create UpdatePartitionStatistics without a
transaction");
+ std::shared_ptr<TransactionContext> ctx) {
+ ICEBERG_PRECHECK(ctx != nullptr,
+ "Cannot create UpdatePartitionStatistics without a
context");
return std::shared_ptr<UpdatePartitionStatistics>(
- new UpdatePartitionStatistics(std::move(transaction)));
+ new UpdatePartitionStatistics(std::move(ctx)));
}
UpdatePartitionStatistics::UpdatePartitionStatistics(
- std::shared_ptr<Transaction> transaction)
- : PendingUpdate(std::move(transaction)) {}
+ std::shared_ptr<TransactionContext> ctx)
+ : PendingUpdate(std::move(ctx)) {}
UpdatePartitionStatistics::~UpdatePartitionStatistics() = default;
diff --git a/src/iceberg/update/update_partition_statistics.h
b/src/iceberg/update/update_partition_statistics.h
index 740fe214..bdaf5a70 100644
--- a/src/iceberg/update/update_partition_statistics.h
+++ b/src/iceberg/update/update_partition_statistics.h
@@ -39,7 +39,7 @@ namespace iceberg {
class ICEBERG_EXPORT UpdatePartitionStatistics : public PendingUpdate {
public:
static Result<std::shared_ptr<UpdatePartitionStatistics>> Make(
- std::shared_ptr<Transaction> transaction);
+ std::shared_ptr<TransactionContext> ctx);
~UpdatePartitionStatistics() override;
@@ -71,7 +71,7 @@ class ICEBERG_EXPORT UpdatePartitionStatistics : public
PendingUpdate {
Result<ApplyResult> Apply();
private:
- explicit UpdatePartitionStatistics(std::shared_ptr<Transaction> transaction);
+ explicit UpdatePartitionStatistics(std::shared_ptr<TransactionContext> ctx);
std::unordered_map<int64_t, std::shared_ptr<PartitionStatisticsFile>>
partition_statistics_to_set_;
diff --git a/src/iceberg/update/update_properties.cc
b/src/iceberg/update/update_properties.cc
index e6fe7a60..1837ab00 100644
--- a/src/iceberg/update/update_properties.cc
+++ b/src/iceberg/update/update_properties.cc
@@ -34,14 +34,13 @@
namespace iceberg {
Result<std::shared_ptr<UpdateProperties>> UpdateProperties::Make(
- std::shared_ptr<Transaction> transaction) {
- ICEBERG_PRECHECK(transaction != nullptr,
- "Cannot create UpdateProperties without a transaction");
- return std::shared_ptr<UpdateProperties>(new
UpdateProperties(std::move(transaction)));
+ std::shared_ptr<TransactionContext> ctx) {
+ ICEBERG_PRECHECK(ctx != nullptr, "Cannot create UpdateProperties without a
context");
+ return std::shared_ptr<UpdateProperties>(new
UpdateProperties(std::move(ctx)));
}
-UpdateProperties::UpdateProperties(std::shared_ptr<Transaction> transaction)
- : PendingUpdate(std::move(transaction)) {}
+UpdateProperties::UpdateProperties(std::shared_ptr<TransactionContext> ctx)
+ : PendingUpdate(std::move(ctx)) {}
UpdateProperties::~UpdateProperties() = default;
diff --git a/src/iceberg/update/update_properties.h
b/src/iceberg/update/update_properties.h
index ec9ab796..491a5567 100644
--- a/src/iceberg/update/update_properties.h
+++ b/src/iceberg/update/update_properties.h
@@ -39,7 +39,7 @@ namespace iceberg {
class ICEBERG_EXPORT UpdateProperties : public PendingUpdate {
public:
static Result<std::shared_ptr<UpdateProperties>> Make(
- std::shared_ptr<Transaction> transaction);
+ std::shared_ptr<TransactionContext> ctx);
~UpdateProperties() override;
@@ -71,7 +71,7 @@ class ICEBERG_EXPORT UpdateProperties : public PendingUpdate {
Result<ApplyResult> Apply();
private:
- explicit UpdateProperties(std::shared_ptr<Transaction> transaction);
+ explicit UpdateProperties(std::shared_ptr<TransactionContext> ctx);
std::unordered_map<std::string, std::string> updates_;
std::unordered_set<std::string> removals_;
diff --git a/src/iceberg/update/update_schema.cc
b/src/iceberg/update/update_schema.cc
index 3fdce409..1f35781f 100644
--- a/src/iceberg/update/update_schema.cc
+++ b/src/iceberg/update/update_schema.cc
@@ -279,15 +279,14 @@ std::vector<SchemaField> ApplyChangesVisitor::MoveFields(
} // namespace
Result<std::shared_ptr<UpdateSchema>> UpdateSchema::Make(
- std::shared_ptr<Transaction> transaction) {
- ICEBERG_PRECHECK(transaction != nullptr,
- "Cannot create UpdateSchema without transaction");
- return std::shared_ptr<UpdateSchema>(new
UpdateSchema(std::move(transaction)));
+ std::shared_ptr<TransactionContext> ctx) {
+ ICEBERG_PRECHECK(ctx != nullptr, "Cannot create UpdateSchema without
context");
+ return std::shared_ptr<UpdateSchema>(new UpdateSchema(std::move(ctx)));
}
-UpdateSchema::UpdateSchema(std::shared_ptr<Transaction> transaction)
- : PendingUpdate(std::move(transaction)) {
- const TableMetadata& base_metadata = transaction_->current();
+UpdateSchema::UpdateSchema(std::shared_ptr<TransactionContext> ctx)
+ : PendingUpdate(std::move(ctx)) {
+ const TableMetadata& base_metadata = ctx_->current();
auto schema_result = base_metadata.Schema();
if (!schema_result.has_value()) {
diff --git a/src/iceberg/update/update_schema.h
b/src/iceberg/update/update_schema.h
index 2223c0b8..564a03df 100644
--- a/src/iceberg/update/update_schema.h
+++ b/src/iceberg/update/update_schema.h
@@ -48,7 +48,7 @@ namespace iceberg {
class ICEBERG_EXPORT UpdateSchema : public PendingUpdate {
public:
static Result<std::shared_ptr<UpdateSchema>> Make(
- std::shared_ptr<Transaction> transaction);
+ std::shared_ptr<TransactionContext> ctx);
~UpdateSchema() override;
@@ -348,7 +348,7 @@ class ICEBERG_EXPORT UpdateSchema : public PendingUpdate {
Result<ApplyResult> Apply();
private:
- explicit UpdateSchema(std::shared_ptr<Transaction> transaction);
+ explicit UpdateSchema(std::shared_ptr<TransactionContext> ctx);
/// \brief Internal implementation for adding a column with full control.
///
diff --git a/src/iceberg/update/update_snapshot_reference.cc
b/src/iceberg/update/update_snapshot_reference.cc
index 923f0c8d..908962ec 100644
--- a/src/iceberg/update/update_snapshot_reference.cc
+++ b/src/iceberg/update/update_snapshot_reference.cc
@@ -33,15 +33,15 @@
namespace iceberg {
Result<std::shared_ptr<UpdateSnapshotReference>> UpdateSnapshotReference::Make(
- std::shared_ptr<Transaction> transaction) {
- ICEBERG_PRECHECK(transaction != nullptr,
- "Cannot create UpdateSnapshotReference without a
transaction");
+ std::shared_ptr<TransactionContext> ctx) {
+ ICEBERG_PRECHECK(ctx != nullptr,
+ "Cannot create UpdateSnapshotReference without a context");
return std::shared_ptr<UpdateSnapshotReference>(
- new UpdateSnapshotReference(std::move(transaction)));
+ new UpdateSnapshotReference(std::move(ctx)));
}
-UpdateSnapshotReference::UpdateSnapshotReference(std::shared_ptr<Transaction>
transaction)
- : PendingUpdate(std::move(transaction)), updated_refs_(base().refs) {}
+UpdateSnapshotReference::UpdateSnapshotReference(std::shared_ptr<TransactionContext>
ctx)
+ : PendingUpdate(std::move(ctx)), updated_refs_(base().refs) {}
UpdateSnapshotReference::~UpdateSnapshotReference() = default;
@@ -143,7 +143,7 @@ UpdateSnapshotReference&
UpdateSnapshotReference::ReplaceBranchInternal(
if (fast_forward) {
// Fast-forward is valid only when the current branch (from) is an
ancestor of the
// target (to), i.e. we are moving forward in history.
- const auto& base_metadata = transaction_->current();
+ const auto& base_metadata = ctx_->current();
ICEBERG_BUILDER_ASSIGN_OR_RETURN(
auto from_is_ancestor_of_to,
SnapshotUtil::IsAncestorOf(
diff --git a/src/iceberg/update/update_snapshot_reference.h
b/src/iceberg/update/update_snapshot_reference.h
index e13f5bfa..7d061ea3 100644
--- a/src/iceberg/update/update_snapshot_reference.h
+++ b/src/iceberg/update/update_snapshot_reference.h
@@ -39,7 +39,7 @@ namespace iceberg {
class ICEBERG_EXPORT UpdateSnapshotReference : public PendingUpdate {
public:
static Result<std::shared_ptr<UpdateSnapshotReference>> Make(
- std::shared_ptr<Transaction> transaction);
+ std::shared_ptr<TransactionContext> ctx);
~UpdateSnapshotReference() override;
@@ -145,7 +145,7 @@ class ICEBERG_EXPORT UpdateSnapshotReference : public
PendingUpdate {
Result<ApplyResult> Apply();
private:
- explicit UpdateSnapshotReference(std::shared_ptr<Transaction> transaction);
+ explicit UpdateSnapshotReference(std::shared_ptr<TransactionContext> ctx);
UpdateSnapshotReference& ReplaceBranchInternal(const std::string& from,
const std::string& to,
diff --git a/src/iceberg/update/update_sort_order.cc
b/src/iceberg/update/update_sort_order.cc
index c5c7be32..8086b903 100644
--- a/src/iceberg/update/update_sort_order.cc
+++ b/src/iceberg/update/update_sort_order.cc
@@ -34,14 +34,13 @@
namespace iceberg {
Result<std::shared_ptr<UpdateSortOrder>> UpdateSortOrder::Make(
- std::shared_ptr<Transaction> transaction) {
- ICEBERG_PRECHECK(transaction != nullptr,
- "Cannot create UpdateSortOrder without a transaction");
- return std::shared_ptr<UpdateSortOrder>(new
UpdateSortOrder(std::move(transaction)));
+ std::shared_ptr<TransactionContext> ctx) {
+ ICEBERG_PRECHECK(ctx != nullptr, "Cannot create UpdateSortOrder without a
context");
+ return std::shared_ptr<UpdateSortOrder>(new UpdateSortOrder(std::move(ctx)));
}
-UpdateSortOrder::UpdateSortOrder(std::shared_ptr<Transaction> transaction)
- : PendingUpdate(std::move(transaction)) {}
+UpdateSortOrder::UpdateSortOrder(std::shared_ptr<TransactionContext> ctx)
+ : PendingUpdate(std::move(ctx)) {}
UpdateSortOrder::~UpdateSortOrder() = default;
diff --git a/src/iceberg/update/update_sort_order.h
b/src/iceberg/update/update_sort_order.h
index 364a70f6..53fe927e 100644
--- a/src/iceberg/update/update_sort_order.h
+++ b/src/iceberg/update/update_sort_order.h
@@ -37,7 +37,7 @@ namespace iceberg {
class ICEBERG_EXPORT UpdateSortOrder : public PendingUpdate {
public:
static Result<std::shared_ptr<UpdateSortOrder>> Make(
- std::shared_ptr<Transaction> transaction);
+ std::shared_ptr<TransactionContext> ctx);
~UpdateSortOrder() override;
@@ -71,7 +71,7 @@ class ICEBERG_EXPORT UpdateSortOrder : public PendingUpdate {
Result<std::shared_ptr<SortOrder>> Apply();
private:
- explicit UpdateSortOrder(std::shared_ptr<Transaction> transaction);
+ explicit UpdateSortOrder(std::shared_ptr<TransactionContext> ctx);
std::vector<SortField> sort_fields_;
bool case_sensitive_ = true;
diff --git a/src/iceberg/update/update_statistics.cc
b/src/iceberg/update/update_statistics.cc
index 46145336..afa6ea0c 100644
--- a/src/iceberg/update/update_statistics.cc
+++ b/src/iceberg/update/update_statistics.cc
@@ -32,14 +32,13 @@
namespace iceberg {
Result<std::shared_ptr<UpdateStatistics>> UpdateStatistics::Make(
- std::shared_ptr<Transaction> transaction) {
- ICEBERG_PRECHECK(transaction != nullptr,
- "Cannot create UpdateStatistics without a transaction");
- return std::shared_ptr<UpdateStatistics>(new
UpdateStatistics(std::move(transaction)));
+ std::shared_ptr<TransactionContext> ctx) {
+ ICEBERG_PRECHECK(ctx != nullptr, "Cannot create UpdateStatistics without a
context");
+ return std::shared_ptr<UpdateStatistics>(new
UpdateStatistics(std::move(ctx)));
}
-UpdateStatistics::UpdateStatistics(std::shared_ptr<Transaction> transaction)
- : PendingUpdate(std::move(transaction)) {}
+UpdateStatistics::UpdateStatistics(std::shared_ptr<TransactionContext> ctx)
+ : PendingUpdate(std::move(ctx)) {}
UpdateStatistics::~UpdateStatistics() = default;
diff --git a/src/iceberg/update/update_statistics.h
b/src/iceberg/update/update_statistics.h
index 55e50fb1..6441c02a 100644
--- a/src/iceberg/update/update_statistics.h
+++ b/src/iceberg/update/update_statistics.h
@@ -39,7 +39,7 @@ namespace iceberg {
class ICEBERG_EXPORT UpdateStatistics : public PendingUpdate {
public:
static Result<std::shared_ptr<UpdateStatistics>> Make(
- std::shared_ptr<Transaction> transaction);
+ std::shared_ptr<TransactionContext> ctx);
~UpdateStatistics() override;
@@ -70,7 +70,7 @@ class ICEBERG_EXPORT UpdateStatistics : public PendingUpdate {
Result<ApplyResult> Apply();
private:
- explicit UpdateStatistics(std::shared_ptr<Transaction> transaction);
+ explicit UpdateStatistics(std::shared_ptr<TransactionContext> ctx);
std::unordered_map<int64_t, std::shared_ptr<StatisticsFile>>
statistics_to_set_;
};