This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 8fdf3462   refactor: introduce TransactionContext to decouple 
Transaction and PendingUpdate (#591)
8fdf3462 is described below

commit 8fdf34628d8e5bdec53e56fbeaa18ff27bff6a4d
Author: Gang Wu <[email protected]>
AuthorDate: Mon Mar 16 22:16:39 2026 +0800

      refactor: introduce TransactionContext to decouple Transaction and 
PendingUpdate (#591)
    
    Add TransactionContext to own the shared state (table, metadata_builder,
    kind) between Transaction and PendingUpdate. Both now hold a
    shared_ptr<TransactionContext> instead of PendingUpdate holding a
    weak_ptr<Transaction>.
    
    This fixes two issues:
    - pending_updates_ was weak_ptr, so dropping a PendingUpdate would
    silently break Finalize/retry; now Transaction holds shared_ptr
    - Table::New*() no longer creates a temporary Transaction; it creates a
    TransactionContext directly and passes it to the PendingUpdate, removing
    the circular dependency
    
    Also clean up related redundancy:
    - Hoist Transaction::Kind to a standalone enum class TransactionKind
    - Remove Transaction::kind_ (duplicate of ctx_->kind)
    - Remove auto_commit machinery; PendingUpdate::Commit() now calls
    txn->Commit() explicitly on the table-created path
    - TransactionContext::Make returns Result to propagate null table errors
---
 src/iceberg/catalog/memory/in_memory_catalog.cc   |   3 +-
 src/iceberg/catalog/rest/rest_catalog.cc          |   3 +-
 src/iceberg/table.cc                              |  53 +++---
 src/iceberg/test/update_partition_spec_test.cc    |  10 +-
 src/iceberg/transaction.cc                        | 195 +++++++++++++---------
 src/iceberg/transaction.h                         |  57 +++++--
 src/iceberg/type_fwd.h                            |   1 +
 src/iceberg/update/expire_snapshots.cc            |  13 +-
 src/iceberg/update/expire_snapshots.h             |   4 +-
 src/iceberg/update/fast_append.cc                 |  18 +-
 src/iceberg/update/fast_append.h                  |   4 +-
 src/iceberg/update/pending_update.cc              |  29 +++-
 src/iceberg/update/pending_update.h               |   4 +-
 src/iceberg/update/set_snapshot.cc                |  13 +-
 src/iceberg/update/set_snapshot.h                 |   4 +-
 src/iceberg/update/snapshot_manager.cc            |   3 +-
 src/iceberg/update/snapshot_update.cc             |  36 ++--
 src/iceberg/update/snapshot_update.h              |   2 +-
 src/iceberg/update/update_location.cc             |  11 +-
 src/iceberg/update/update_location.h              |   4 +-
 src/iceberg/update/update_partition_spec.cc       |  12 +-
 src/iceberg/update/update_partition_spec.h        |   4 +-
 src/iceberg/update/update_partition_statistics.cc |  12 +-
 src/iceberg/update/update_partition_statistics.h  |   4 +-
 src/iceberg/update/update_properties.cc           |  11 +-
 src/iceberg/update/update_properties.h            |   4 +-
 src/iceberg/update/update_schema.cc               |  13 +-
 src/iceberg/update/update_schema.h                |   4 +-
 src/iceberg/update/update_snapshot_reference.cc   |  14 +-
 src/iceberg/update/update_snapshot_reference.h    |   4 +-
 src/iceberg/update/update_sort_order.cc           |  11 +-
 src/iceberg/update/update_sort_order.h            |   4 +-
 src/iceberg/update/update_statistics.cc           |  11 +-
 src/iceberg/update/update_statistics.h            |   4 +-
 34 files changed, 314 insertions(+), 265 deletions(-)

diff --git a/src/iceberg/catalog/memory/in_memory_catalog.cc 
b/src/iceberg/catalog/memory/in_memory_catalog.cc
index 8a082aad..7cd02ac7 100644
--- a/src/iceberg/catalog/memory/in_memory_catalog.cc
+++ b/src/iceberg/catalog/memory/in_memory_catalog.cc
@@ -500,8 +500,7 @@ Result<std::shared_ptr<Transaction>> 
InMemoryCatalog::StageCreateTable(
   ICEBERG_ASSIGN_OR_RAISE(
       auto table, StagedTable::Make(identifier, std::move(table_metadata), "", 
file_io_,
                                     shared_from_this()));
-  return Transaction::Make(std::move(table), Transaction::Kind::kCreate,
-                           /* auto_commit */ false);
+  return Transaction::Make(std::move(table), TransactionKind::kCreate);
 }
 
 Result<bool> InMemoryCatalog::TableExists(const TableIdentifier& identifier) 
const {
diff --git a/src/iceberg/catalog/rest/rest_catalog.cc 
b/src/iceberg/catalog/rest/rest_catalog.cc
index 94c6b1e4..42dc8659 100644
--- a/src/iceberg/catalog/rest/rest_catalog.cc
+++ b/src/iceberg/catalog/rest/rest_catalog.cc
@@ -384,8 +384,7 @@ Result<std::shared_ptr<Transaction>> 
RestCatalog::StageCreateTable(
                           StagedTable::Make(identifier, 
std::move(result.metadata),
                                             
std::move(result.metadata_location), file_io_,
                                             shared_from_this()));
-  return Transaction::Make(std::move(staged_table), Transaction::Kind::kCreate,
-                           /*auto_commit=*/false);
+  return Transaction::Make(std::move(staged_table), TransactionKind::kCreate);
 }
 
 Status RestCatalog::DropTable(const TableIdentifier& identifier, bool purge) {
diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc
index 72190855..2f2753f3 100644
--- a/src/iceberg/table.cc
+++ b/src/iceberg/table.cc
@@ -32,11 +32,16 @@
 #include "iceberg/table_scan.h"
 #include "iceberg/transaction.h"
 #include "iceberg/update/expire_snapshots.h"
+#include "iceberg/update/fast_append.h"
+#include "iceberg/update/set_snapshot.h"
 #include "iceberg/update/snapshot_manager.h"
+#include "iceberg/update/update_location.h"
 #include "iceberg/update/update_partition_spec.h"
 #include "iceberg/update/update_partition_statistics.h"
 #include "iceberg/update/update_properties.h"
 #include "iceberg/update/update_schema.h"
+#include "iceberg/update/update_snapshot_reference.h"
+#include "iceberg/update/update_sort_order.h"
 #include "iceberg/update/update_statistics.h"
 #include "iceberg/util/macros.h"
 
@@ -166,71 +171,61 @@ Table::NewIncrementalChangelogScan() const {
 Result<std::shared_ptr<Transaction>> Table::NewTransaction() {
   // Create a brand new transaction object for the table. Users are expected 
to commit the
   // transaction manually.
-  return Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
-                           /*auto_commit=*/false);
+  return Transaction::Make(shared_from_this(), TransactionKind::kUpdate);
 }
 
 Result<std::shared_ptr<UpdatePartitionSpec>> Table::NewUpdatePartitionSpec() {
   ICEBERG_ASSIGN_OR_RAISE(
-      auto transaction, Transaction::Make(shared_from_this(), 
Transaction::Kind::kUpdate,
-                                          /*auto_commit=*/true));
-  return transaction->NewUpdatePartitionSpec();
+      auto ctx, TransactionContext::Make(shared_from_this(), 
TransactionKind::kUpdate));
+  return UpdatePartitionSpec::Make(std::move(ctx));
 }
 
 Result<std::shared_ptr<UpdateProperties>> Table::NewUpdateProperties() {
   ICEBERG_ASSIGN_OR_RAISE(
-      auto transaction, Transaction::Make(shared_from_this(), 
Transaction::Kind::kUpdate,
-                                          /*auto_commit=*/true));
-  return transaction->NewUpdateProperties();
+      auto ctx, TransactionContext::Make(shared_from_this(), 
TransactionKind::kUpdate));
+  return UpdateProperties::Make(std::move(ctx));
 }
 
 Result<std::shared_ptr<UpdateSortOrder>> Table::NewUpdateSortOrder() {
   ICEBERG_ASSIGN_OR_RAISE(
-      auto transaction, Transaction::Make(shared_from_this(), 
Transaction::Kind::kUpdate,
-                                          /*auto_commit=*/true));
-  return transaction->NewUpdateSortOrder();
+      auto ctx, TransactionContext::Make(shared_from_this(), 
TransactionKind::kUpdate));
+  return UpdateSortOrder::Make(std::move(ctx));
 }
 
 Result<std::shared_ptr<UpdateSchema>> Table::NewUpdateSchema() {
   ICEBERG_ASSIGN_OR_RAISE(
-      auto transaction, Transaction::Make(shared_from_this(), 
Transaction::Kind::kUpdate,
-                                          /*auto_commit=*/true));
-  return transaction->NewUpdateSchema();
+      auto ctx, TransactionContext::Make(shared_from_this(), 
TransactionKind::kUpdate));
+  return UpdateSchema::Make(std::move(ctx));
 }
 
 Result<std::shared_ptr<ExpireSnapshots>> Table::NewExpireSnapshots() {
   ICEBERG_ASSIGN_OR_RAISE(
-      auto transaction, Transaction::Make(shared_from_this(), 
Transaction::Kind::kUpdate,
-                                          /*auto_commit=*/true));
-  return transaction->NewExpireSnapshots();
+      auto ctx, TransactionContext::Make(shared_from_this(), 
TransactionKind::kUpdate));
+  return ExpireSnapshots::Make(std::move(ctx));
 }
 
 Result<std::shared_ptr<UpdateLocation>> Table::NewUpdateLocation() {
   ICEBERG_ASSIGN_OR_RAISE(
-      auto transaction, Transaction::Make(shared_from_this(), 
Transaction::Kind::kUpdate,
-                                          /*auto_commit=*/true));
-  return transaction->NewUpdateLocation();
+      auto ctx, TransactionContext::Make(shared_from_this(), 
TransactionKind::kUpdate));
+  return UpdateLocation::Make(std::move(ctx));
 }
 
 Result<std::shared_ptr<FastAppend>> Table::NewFastAppend() {
   ICEBERG_ASSIGN_OR_RAISE(
-      auto transaction, Transaction::Make(shared_from_this(), 
Transaction::Kind::kUpdate,
-                                          /*auto_commit=*/true));
-  return transaction->NewFastAppend();
+      auto ctx, TransactionContext::Make(shared_from_this(), 
TransactionKind::kUpdate));
+  return FastAppend::Make(name().name, std::move(ctx));
 }
 
 Result<std::shared_ptr<UpdateStatistics>> Table::NewUpdateStatistics() {
   ICEBERG_ASSIGN_OR_RAISE(
-      auto transaction, Transaction::Make(shared_from_this(), 
Transaction::Kind::kUpdate,
-                                          /*auto_commit=*/true));
-  return transaction->NewUpdateStatistics();
+      auto ctx, TransactionContext::Make(shared_from_this(), 
TransactionKind::kUpdate));
+  return UpdateStatistics::Make(std::move(ctx));
 }
 
 Result<std::shared_ptr<UpdatePartitionStatistics>> 
Table::NewUpdatePartitionStatistics() {
   ICEBERG_ASSIGN_OR_RAISE(
-      auto transaction, Transaction::Make(shared_from_this(), 
Transaction::Kind::kUpdate,
-                                          /*auto_commit=*/true));
-  return transaction->NewUpdatePartitionStatistics();
+      auto ctx, TransactionContext::Make(shared_from_this(), 
TransactionKind::kUpdate));
+  return UpdatePartitionStatistics::Make(std::move(ctx));
 }
 
 Result<std::shared_ptr<SnapshotManager>> Table::NewSnapshotManager() {
diff --git a/src/iceberg/test/update_partition_spec_test.cc 
b/src/iceberg/test/update_partition_spec_test.cc
index fc316aae..632c4a55 100644
--- a/src/iceberg/test/update_partition_spec_test.cc
+++ b/src/iceberg/test/update_partition_spec_test.cc
@@ -232,14 +232,12 @@ class UpdatePartitionSpecTest : public 
::testing::TestWithParam<int8_t> {
   // Helper to create UpdatePartitionSpec from a table
   std::shared_ptr<UpdatePartitionSpec> CreateUpdateFromTable(
       std::shared_ptr<Table> table) {
-    auto transaction_result =
-        Transaction::Make(table, Transaction::Kind::kUpdate, 
/*auto_commit=*/false);
-    if (!transaction_result.has_value()) {
-      ADD_FAILURE() << "Failed to create transaction: "
-                    << transaction_result.error().message;
+    auto ctx_result = TransactionContext::Make(table, 
TransactionKind::kUpdate);
+    if (!ctx_result.has_value()) {
+      ADD_FAILURE() << "Failed to create context: " << 
ctx_result.error().message;
       return nullptr;
     }
-    auto update_result = UpdatePartitionSpec::Make(transaction_result.value());
+    auto update_result = 
UpdatePartitionSpec::Make(std::move(ctx_result.value()));
     if (!update_result.has_value()) {
       ADD_FAILURE() << "Failed to create UpdatePartitionSpec: "
                     << update_result.error().message;
diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc
index 58b0daf9..bf4ac426 100644
--- a/src/iceberg/transaction.cc
+++ b/src/iceberg/transaction.cc
@@ -19,6 +19,7 @@
  */
 #include "iceberg/transaction.h"
 
+#include <format>
 #include <memory>
 #include <optional>
 
@@ -52,50 +53,85 @@
 
 namespace iceberg {
 
-Transaction::Transaction(std::shared_ptr<Table> table, Kind kind, bool 
auto_commit,
-                         std::unique_ptr<TableMetadataBuilder> 
metadata_builder)
-    : table_(std::move(table)),
-      kind_(kind),
-      auto_commit_(auto_commit),
-      metadata_builder_(std::move(metadata_builder)) {}
+// ---------------------------------------------------------------------------
+// TransactionContext
+// ---------------------------------------------------------------------------
+
+TransactionContext::TransactionContext() = default;
+TransactionContext::~TransactionContext() = default;
+
+Result<std::shared_ptr<TransactionContext>> TransactionContext::Make(
+    std::shared_ptr<Table> table, TransactionKind kind) {
+  ICEBERG_PRECHECK(table != nullptr, "Table cannot be null");
+  auto ctx = std::make_shared<TransactionContext>();
+  ctx->kind = kind;
+  ctx->table = std::move(table);
+  if (kind == TransactionKind::kCreate) {
+    ctx->metadata_builder = TableMetadataBuilder::BuildFromEmpty();
+    std::ignore = 
ctx->metadata_builder->ApplyChangesForCreate(*ctx->table->metadata());
+  } else {
+    ctx->metadata_builder = 
TableMetadataBuilder::BuildFrom(ctx->table->metadata().get());
+  }
+  return ctx;
+}
+
+const TableMetadata* TransactionContext::base() const { return 
metadata_builder->base(); }
+
+const TableMetadata& TransactionContext::current() const {
+  return metadata_builder->current();
+}
+
+std::string TransactionContext::MetadataFileLocation(std::string_view 
filename) const {
+  const auto metadata_location =
+      current().properties.Get(TableProperties::kWriteMetadataLocation);
+  if (metadata_location.empty()) {
+    return std::format("{}/metadata/{}", current().location, filename);
+  }
+  return std::format("{}/{}", 
LocationUtil::StripTrailingSlash(metadata_location),
+                     filename);
+}
+
+// ---------------------------------------------------------------------------
+// Transaction
+// ---------------------------------------------------------------------------
+
+Transaction::Transaction(std::shared_ptr<TransactionContext> ctx)
+    : ctx_(std::move(ctx)) {}
 
 Transaction::~Transaction() = default;
 
 Result<std::shared_ptr<Transaction>> Transaction::Make(std::shared_ptr<Table> 
table,
-                                                       Kind kind, bool 
auto_commit) {
+                                                       TransactionKind kind) {
   ICEBERG_PRECHECK(table && table->catalog(), "Table and catalog cannot be 
null");
+  ICEBERG_ASSIGN_OR_RAISE(auto ctx, TransactionContext::Make(std::move(table), 
kind));
+  auto txn = std::shared_ptr<Transaction>(new Transaction(ctx));
+  ctx->transaction = std::weak_ptr<Transaction>(txn);
+  return txn;
+}
 
-  std::unique_ptr<TableMetadataBuilder> metadata_builder;
-  if (kind == Kind::kCreate) {
-    metadata_builder = TableMetadataBuilder::BuildFromEmpty();
-    std::ignore = metadata_builder->ApplyChangesForCreate(*table->metadata());
-  } else {
-    metadata_builder = 
TableMetadataBuilder::BuildFrom(table->metadata().get());
-  }
-
-  return std::shared_ptr<Transaction>(
-      new Transaction(std::move(table), kind, auto_commit, 
std::move(metadata_builder)));
+Result<std::shared_ptr<Transaction>> Transaction::Make(
+    std::shared_ptr<TransactionContext> ctx) {
+  ICEBERG_PRECHECK(ctx != nullptr, "TransactionContext cannot be null");
+  auto txn = std::shared_ptr<Transaction>(new Transaction(ctx));
+  ctx->transaction = std::weak_ptr<Transaction>(txn);
+  return txn;
 }
 
-const TableMetadata* Transaction::base() const { return 
metadata_builder_->base(); }
+const std::shared_ptr<Table>& Transaction::table() const { return ctx_->table; 
}
+
+const TableMetadata* Transaction::base() const { return ctx_->base(); }
 
-const TableMetadata& Transaction::current() const { return 
metadata_builder_->current(); }
+const TableMetadata& Transaction::current() const { return ctx_->current(); }
 
 std::string Transaction::MetadataFileLocation(std::string_view filename) const 
{
-  const auto metadata_location =
-      current().properties.Get(TableProperties::kWriteMetadataLocation);
-  if (metadata_location.empty()) {
-    return std::format("{}/{}", 
LocationUtil::StripTrailingSlash(metadata_location),
-                       filename);
-  }
-  return std::format("{}/metadata/{}", current().location, filename);
+  return ctx_->MetadataFileLocation(filename);
 }
 
 Status Transaction::AddUpdate(const std::shared_ptr<PendingUpdate>& update) {
   ICEBERG_CHECK(last_update_committed_,
                 "Cannot add update when previous update is not committed");
 
-  pending_updates_.emplace_back(std::weak_ptr<PendingUpdate>(update));
+  pending_updates_.push_back(update);
   last_update_committed_ = false;
   return {};
 }
@@ -153,52 +189,48 @@ Status Transaction::Apply(PendingUpdate& update) {
 
   last_update_committed_ = true;
 
-  if (auto_commit_) {
-    ICEBERG_RETURN_UNEXPECTED(Commit());
-  }
-
   return {};
 }
 
 Status Transaction::ApplyExpireSnapshots(ExpireSnapshots& update) {
   ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
   if (!result.snapshot_ids_to_remove.empty()) {
-    
metadata_builder_->RemoveSnapshots(std::move(result.snapshot_ids_to_remove));
+    
ctx_->metadata_builder->RemoveSnapshots(std::move(result.snapshot_ids_to_remove));
   }
   if (!result.refs_to_remove.empty()) {
     for (const auto& ref_name : result.refs_to_remove) {
-      metadata_builder_->RemoveRef(ref_name);
+      ctx_->metadata_builder->RemoveRef(ref_name);
     }
   }
   if (!result.partition_spec_ids_to_remove.empty()) {
-    metadata_builder_->RemovePartitionSpecs(
+    ctx_->metadata_builder->RemovePartitionSpecs(
         std::move(result.partition_spec_ids_to_remove));
   }
   if (!result.schema_ids_to_remove.empty()) {
-    metadata_builder_->RemoveSchemas(std::move(result.schema_ids_to_remove));
+    
ctx_->metadata_builder->RemoveSchemas(std::move(result.schema_ids_to_remove));
   }
   return {};
 }
 
 Status Transaction::ApplySetSnapshot(SetSnapshot& update) {
   ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, update.Apply());
-  metadata_builder_->SetBranchSnapshot(snapshot_id,
-                                       std::string(SnapshotRef::kMainBranch));
+  ctx_->metadata_builder->SetBranchSnapshot(snapshot_id,
+                                            
std::string(SnapshotRef::kMainBranch));
   return {};
 }
 
 Status Transaction::ApplyUpdateLocation(UpdateLocation& update) {
   ICEBERG_ASSIGN_OR_RAISE(auto location, update.Apply());
-  metadata_builder_->SetLocation(location);
+  ctx_->metadata_builder->SetLocation(location);
   return {};
 }
 
 Status Transaction::ApplyUpdatePartitionSpec(UpdatePartitionSpec& update) {
   ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
   if (result.set_as_default) {
-    metadata_builder_->SetDefaultPartitionSpec(std::move(result.spec));
+    ctx_->metadata_builder->SetDefaultPartitionSpec(std::move(result.spec));
   } else {
-    metadata_builder_->AddPartitionSpec(std::move(result.spec));
+    ctx_->metadata_builder->AddPartitionSpec(std::move(result.spec));
   }
   return {};
 }
@@ -206,30 +238,30 @@ Status 
Transaction::ApplyUpdatePartitionSpec(UpdatePartitionSpec& update) {
 Status Transaction::ApplyUpdateProperties(UpdateProperties& update) {
   ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
   if (!result.updates.empty()) {
-    metadata_builder_->SetProperties(std::move(result.updates));
+    ctx_->metadata_builder->SetProperties(std::move(result.updates));
   }
   if (!result.removals.empty()) {
-    metadata_builder_->RemoveProperties(std::move(result.removals));
+    ctx_->metadata_builder->RemoveProperties(std::move(result.removals));
   }
   if (result.format_version.has_value()) {
-    metadata_builder_->UpgradeFormatVersion(result.format_version.value());
+    
ctx_->metadata_builder->UpgradeFormatVersion(result.format_version.value());
   }
   return {};
 }
 
 Status Transaction::ApplyUpdateSchema(UpdateSchema& update) {
   ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
-  metadata_builder_->SetCurrentSchema(std::move(result.schema),
-                                      result.new_last_column_id);
+  ctx_->metadata_builder->SetCurrentSchema(std::move(result.schema),
+                                           result.new_last_column_id);
   if (!result.updated_props.empty()) {
-    metadata_builder_->SetProperties(result.updated_props);
+    ctx_->metadata_builder->SetProperties(result.updated_props);
   }
 
   return {};
 }
 
 Status Transaction::ApplyUpdateSnapshot(SnapshotUpdate& update) {
-  const auto& base = metadata_builder_->current();
+  const auto& base = ctx_->metadata_builder->current();
 
   ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
 
@@ -252,14 +284,14 @@ Status Transaction::ApplyUpdateSnapshot(SnapshotUpdate& 
update) {
   }
 
   for (const auto& change : temp_update->changes()) {
-    change->ApplyTo(*metadata_builder_);
+    change->ApplyTo(*ctx_->metadata_builder);
   }
 
   // If the table UUID is missing, add it here. the UUID will be re-created 
each time
   // this operation retries to ensure that if a concurrent operation assigns 
the UUID,
   // this operation will not fail.
   if (base.table_uuid.empty()) {
-    metadata_builder_->AssignUUID();
+    ctx_->metadata_builder->AssignUUID();
   }
   return {};
 }
@@ -267,27 +299,27 @@ Status Transaction::ApplyUpdateSnapshot(SnapshotUpdate& 
update) {
 Status Transaction::ApplyUpdateSnapshotReference(UpdateSnapshotReference& 
update) {
   ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
   for (const auto& name : result.to_remove) {
-    metadata_builder_->RemoveRef(name);
+    ctx_->metadata_builder->RemoveRef(name);
   }
   for (auto&& [name, ref] : result.to_set) {
-    metadata_builder_->SetRef(std::move(name), std::move(ref));
+    ctx_->metadata_builder->SetRef(std::move(name), std::move(ref));
   }
   return {};
 }
 
 Status Transaction::ApplyUpdateSortOrder(UpdateSortOrder& update) {
   ICEBERG_ASSIGN_OR_RAISE(auto sort_order, update.Apply());
-  metadata_builder_->SetDefaultSortOrder(std::move(sort_order));
+  ctx_->metadata_builder->SetDefaultSortOrder(std::move(sort_order));
   return {};
 }
 
 Status Transaction::ApplyUpdateStatistics(UpdateStatistics& update) {
   ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
   for (auto&& [_, stat_file] : result.to_set) {
-    metadata_builder_->SetStatistics(std::move(stat_file));
+    ctx_->metadata_builder->SetStatistics(std::move(stat_file));
   }
   for (const auto& snapshot_id : result.to_remove) {
-    metadata_builder_->RemoveStatistics(snapshot_id);
+    ctx_->metadata_builder->RemoveStatistics(snapshot_id);
   }
   return {};
 }
@@ -295,10 +327,10 @@ Status 
Transaction::ApplyUpdateStatistics(UpdateStatistics& update) {
 Status Transaction::ApplyUpdatePartitionStatistics(UpdatePartitionStatistics& 
update) {
   ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
   for (auto&& [_, partition_stat_file] : result.to_set) {
-    metadata_builder_->SetPartitionStatistics(std::move(partition_stat_file));
+    
ctx_->metadata_builder->SetPartitionStatistics(std::move(partition_stat_file));
   }
   for (const auto& snapshot_id : result.to_remove) {
-    metadata_builder_->RemovePartitionStatistics(snapshot_id);
+    ctx_->metadata_builder->RemovePartitionStatistics(snapshot_id);
   }
   return {};
 }
@@ -308,104 +340,103 @@ Result<std::shared_ptr<Table>> Transaction::Commit() {
   ICEBERG_CHECK(last_update_committed_,
                 "Cannot commit transaction when previous update is not 
committed");
 
-  const auto& updates = metadata_builder_->changes();
+  const auto& updates = ctx_->metadata_builder->changes();
   if (updates.empty()) {
     committed_ = true;
-    return table_;
+    return ctx_->table;
   }
 
   std::vector<std::unique_ptr<TableRequirement>> requirements;
-  switch (kind_) {
-    case Kind::kCreate: {
+  switch (ctx_->kind) {
+    case TransactionKind::kCreate: {
       ICEBERG_ASSIGN_OR_RAISE(requirements, 
TableRequirements::ForCreateTable(updates));
     } break;
-    case Kind::kUpdate: {
-      ICEBERG_ASSIGN_OR_RAISE(requirements, TableRequirements::ForUpdateTable(
-                                                *metadata_builder_->base(), 
updates));
+    case TransactionKind::kUpdate: {
+      ICEBERG_ASSIGN_OR_RAISE(
+          requirements,
+          TableRequirements::ForUpdateTable(*ctx_->metadata_builder->base(), 
updates));
 
     } break;
   }
 
   // XXX: we should handle commit failure and retry here.
   auto commit_result =
-      table_->catalog()->UpdateTable(table_->name(), requirements, updates);
+      ctx_->table->catalog()->UpdateTable(ctx_->table->name(), requirements, 
updates);
 
   for (const auto& update : pending_updates_) {
-    if (auto update_ptr = update.lock()) {
-      std::ignore = update_ptr->Finalize(commit_result.has_value()
-                                             ? std::nullopt
-                                             : 
std::make_optional(commit_result.error()));
-    }
+    std::ignore = update->Finalize(commit_result.has_value()
+                                       ? std::nullopt
+                                       : 
std::make_optional(commit_result.error()));
   }
 
   ICEBERG_RETURN_UNEXPECTED(commit_result);
 
   // Mark as committed and update table reference
   committed_ = true;
-  table_ = std::move(commit_result.value());
+  ctx_->table = std::move(commit_result.value());
 
-  return table_;
+  return ctx_->table;
 }
 
 Result<std::shared_ptr<UpdatePartitionSpec>> 
Transaction::NewUpdatePartitionSpec() {
   ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<UpdatePartitionSpec> update_spec,
-                          UpdatePartitionSpec::Make(shared_from_this()));
+                          UpdatePartitionSpec::Make(ctx_));
   ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_spec));
   return update_spec;
 }
 
 Result<std::shared_ptr<UpdateProperties>> Transaction::NewUpdateProperties() {
   ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<UpdateProperties> update_properties,
-                          UpdateProperties::Make(shared_from_this()));
+                          UpdateProperties::Make(ctx_));
   ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_properties));
   return update_properties;
 }
 
 Result<std::shared_ptr<UpdateSortOrder>> Transaction::NewUpdateSortOrder() {
   ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<UpdateSortOrder> update_sort_order,
-                          UpdateSortOrder::Make(shared_from_this()));
+                          UpdateSortOrder::Make(ctx_));
   ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_sort_order));
   return update_sort_order;
 }
 
 Result<std::shared_ptr<UpdateSchema>> Transaction::NewUpdateSchema() {
   ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<UpdateSchema> update_schema,
-                          UpdateSchema::Make(shared_from_this()));
+                          UpdateSchema::Make(ctx_));
   ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_schema));
   return update_schema;
 }
 
 Result<std::shared_ptr<ExpireSnapshots>> Transaction::NewExpireSnapshots() {
   ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<ExpireSnapshots> expire_snapshots,
-                          ExpireSnapshots::Make(shared_from_this()));
+                          ExpireSnapshots::Make(ctx_));
   ICEBERG_RETURN_UNEXPECTED(AddUpdate(expire_snapshots));
   return expire_snapshots;
 }
 
 Result<std::shared_ptr<UpdateLocation>> Transaction::NewUpdateLocation() {
   ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<UpdateLocation> update_location,
-                          UpdateLocation::Make(shared_from_this()));
+                          UpdateLocation::Make(ctx_));
   ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_location));
   return update_location;
 }
 
 Result<std::shared_ptr<SetSnapshot>> Transaction::NewSetSnapshot() {
   ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<SetSnapshot> set_snapshot,
-                          SetSnapshot::Make(shared_from_this()));
+                          SetSnapshot::Make(ctx_));
   ICEBERG_RETURN_UNEXPECTED(AddUpdate(set_snapshot));
   return set_snapshot;
 }
 
 Result<std::shared_ptr<FastAppend>> Transaction::NewFastAppend() {
   ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<FastAppend> fast_append,
-                          FastAppend::Make(table_->name().name, 
shared_from_this()));
+                          FastAppend::Make(ctx_->table->name().name, ctx_));
   ICEBERG_RETURN_UNEXPECTED(AddUpdate(fast_append));
   return fast_append;
 }
 
 Result<std::shared_ptr<UpdateStatistics>> Transaction::NewUpdateStatistics() {
   ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<UpdateStatistics> update_statistics,
-                          UpdateStatistics::Make(shared_from_this()));
+                          UpdateStatistics::Make(ctx_));
   ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_statistics));
   return update_statistics;
 }
@@ -414,7 +445,7 @@ Result<std::shared_ptr<UpdatePartitionStatistics>>
 Transaction::NewUpdatePartitionStatistics() {
   ICEBERG_ASSIGN_OR_RAISE(
       std::shared_ptr<UpdatePartitionStatistics> update_partition_statistics,
-      UpdatePartitionStatistics::Make(shared_from_this()));
+      UpdatePartitionStatistics::Make(ctx_));
   ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_partition_statistics));
   return update_partition_statistics;
 }
@@ -422,7 +453,7 @@ Transaction::NewUpdatePartitionStatistics() {
 Result<std::shared_ptr<UpdateSnapshotReference>>
 Transaction::NewUpdateSnapshotReference() {
   ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<UpdateSnapshotReference> update_ref,
-                          UpdateSnapshotReference::Make(shared_from_this()));
+                          UpdateSnapshotReference::Make(ctx_));
   ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_ref));
   return update_ref;
 }
diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h
index 438054b5..ec8c4db0 100644
--- a/src/iceberg/transaction.h
+++ b/src/iceberg/transaction.h
@@ -20,7 +20,11 @@
 
 #pragma once
 
+#include <cstdint>
 #include <memory>
+#include <optional>
+#include <string>
+#include <string_view>
 #include <vector>
 
 #include "iceberg/iceberg_export.h"
@@ -29,19 +33,24 @@
 
 namespace iceberg {
 
+/// \brief Whether a transaction creates a new table or updates an existing 
one.
+enum class TransactionKind : uint8_t { kCreate, kUpdate };
+
 /// \brief A transaction for performing multiple updates to a table
 class ICEBERG_EXPORT Transaction : public 
std::enable_shared_from_this<Transaction> {
  public:
-  enum class Kind : uint8_t { kCreate, kUpdate };
-
   ~Transaction();
 
   /// \brief Create a new transaction
   static Result<std::shared_ptr<Transaction>> Make(std::shared_ptr<Table> 
table,
-                                                   Kind kind, bool 
auto_commit);
+                                                   TransactionKind kind);
+
+  /// \brief Create a transaction from an existing context (used by 
PendingUpdate::Commit)
+  static Result<std::shared_ptr<Transaction>> Make(
+      std::shared_ptr<TransactionContext> ctx);
 
   /// \brief Return the Table that this transaction will update
-  const std::shared_ptr<Table>& table() const { return table_; }
+  const std::shared_ptr<Table>& table() const;
 
   /// \brief Returns the base metadata without any changes
   const TableMetadata* base() const;
@@ -109,8 +118,7 @@ class ICEBERG_EXPORT Transaction : public 
std::enable_shared_from_this<Transacti
   Result<std::shared_ptr<UpdateSnapshotReference>> 
NewUpdateSnapshotReference();
 
  private:
-  Transaction(std::shared_ptr<Table> table, Kind kind, bool auto_commit,
-              std::unique_ptr<TableMetadataBuilder> metadata_builder);
+  explicit Transaction(std::shared_ptr<TransactionContext> ctx);
 
   Status AddUpdate(const std::shared_ptr<PendingUpdate>& update);
 
@@ -133,22 +141,35 @@ class ICEBERG_EXPORT Transaction : public 
std::enable_shared_from_this<Transacti
  private:
   friend class PendingUpdate;
 
-  // The table that this transaction will update.
-  std::shared_ptr<Table> table_;
-  // The kind of this transaction.
-  const Kind kind_;
-  // Whether to auto-commit the transaction when updates are applied.
-  // This is useful when a temporary transaction is created for a single 
operation.
-  bool auto_commit_;
+  // Shared context owning the table, metadata builder, and kind.
+  std::shared_ptr<TransactionContext> ctx_;
+  // Keep track of all created pending updates.
+  std::vector<std::shared_ptr<PendingUpdate>> pending_updates_;
   // To make the state simple, we require updates are added and committed in 
order.
   bool last_update_committed_ = true;
   // Tracks if transaction has been committed to prevent double-commit
   bool committed_ = false;
-  // Keep track of all created pending updates. Use weak_ptr to avoid circular 
references.
-  // This is useful to retry failed updates.
-  std::vector<std::weak_ptr<PendingUpdate>> pending_updates_;
-  // Accumulated updates from all pending updates.
-  std::unique_ptr<TableMetadataBuilder> metadata_builder_;
+};
+
+/// \brief Shared context between Transaction and PendingUpdate instances.
+class ICEBERG_EXPORT TransactionContext {
+ public:
+  TransactionContext();
+  ~TransactionContext();
+
+  static Result<std::shared_ptr<TransactionContext>> 
Make(std::shared_ptr<Table> table,
+                                                          TransactionKind 
kind);
+
+  const TableMetadata* base() const;
+  const TableMetadata& current() const;
+  std::string MetadataFileLocation(std::string_view filename) const;
+
+  std::shared_ptr<Table> table;
+  std::unique_ptr<TableMetadataBuilder> metadata_builder;
+  TransactionKind kind;
+  // If PendingUpdate is created directly from Table, this is nullopt;
+  // otherwise, it holds a weak pointer to the Transaction that created it.
+  std::optional<std::weak_ptr<Transaction>> transaction;
 };
 
 }  // namespace iceberg
diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h
index cad3e969..0b0cccbf 100644
--- a/src/iceberg/type_fwd.h
+++ b/src/iceberg/type_fwd.h
@@ -203,6 +203,7 @@ class TableUpdate;
 class TableRequirement;
 class TableUpdateContext;
 class Transaction;
+class TransactionContext;
 
 /// \brief Update family.
 class ExpireSnapshots;
diff --git a/src/iceberg/update/expire_snapshots.cc 
b/src/iceberg/update/expire_snapshots.cc
index 68cf08ca..722ae7a4 100644
--- a/src/iceberg/update/expire_snapshots.cc
+++ b/src/iceberg/update/expire_snapshots.cc
@@ -38,14 +38,13 @@
 namespace iceberg {
 
 Result<std::shared_ptr<ExpireSnapshots>> ExpireSnapshots::Make(
-    std::shared_ptr<Transaction> transaction) {
-  ICEBERG_PRECHECK(transaction != nullptr,
-                   "Cannot create ExpireSnapshots without a transaction");
-  return std::shared_ptr<ExpireSnapshots>(new 
ExpireSnapshots(std::move(transaction)));
+    std::shared_ptr<TransactionContext> ctx) {
+  ICEBERG_PRECHECK(ctx != nullptr, "Cannot create ExpireSnapshots without a 
context");
+  return std::shared_ptr<ExpireSnapshots>(new ExpireSnapshots(std::move(ctx)));
 }
 
-ExpireSnapshots::ExpireSnapshots(std::shared_ptr<Transaction> transaction)
-    : PendingUpdate(std::move(transaction)),
+ExpireSnapshots::ExpireSnapshots(std::shared_ptr<TransactionContext> ctx)
+    : PendingUpdate(std::move(ctx)),
       current_time_ms_(CurrentTimePointMs()),
       
default_max_ref_age_ms_(base().properties.Get(TableProperties::kMaxRefAgeMs)),
       default_min_num_snapshots_(
@@ -263,7 +262,7 @@ Result<ExpireSnapshots::ApplyResult> 
ExpireSnapshots::Apply() {
       ICEBERG_ASSIGN_OR_RAISE(auto snapshot, base.SnapshotById(snapshot_id));
       SnapshotCache snapshot_cache(snapshot.get());
       ICEBERG_ASSIGN_OR_RAISE(auto manifests,
-                              
snapshot_cache.Manifests(transaction_->table()->io()));
+                              snapshot_cache.Manifests(ctx_->table->io()));
       for (const auto& manifest : manifests) {
         reachable_specs.insert(manifest.partition_spec_id);
       }
diff --git a/src/iceberg/update/expire_snapshots.h 
b/src/iceberg/update/expire_snapshots.h
index 17a4d8b0..bc05d810 100644
--- a/src/iceberg/update/expire_snapshots.h
+++ b/src/iceberg/update/expire_snapshots.h
@@ -64,7 +64,7 @@ enum class CleanupLevel : uint8_t {
 class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate {
  public:
   static Result<std::shared_ptr<ExpireSnapshots>> Make(
-      std::shared_ptr<Transaction> transaction);
+      std::shared_ptr<TransactionContext> ctx);
 
   ~ExpireSnapshots() override;
 
@@ -143,7 +143,7 @@ class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate 
{
   Result<ApplyResult> Apply();
 
  private:
-  explicit ExpireSnapshots(std::shared_ptr<Transaction> transaction);
+  explicit ExpireSnapshots(std::shared_ptr<TransactionContext> ctx);
 
   using SnapshotToRef = std::unordered_map<std::string, 
std::shared_ptr<SnapshotRef>>;
 
diff --git a/src/iceberg/update/fast_append.cc 
b/src/iceberg/update/fast_append.cc
index 3c132a40..d08f497c 100644
--- a/src/iceberg/update/fast_append.cc
+++ b/src/iceberg/update/fast_append.cc
@@ -36,16 +36,15 @@
 namespace iceberg {
 
 Result<std::unique_ptr<FastAppend>> FastAppend::Make(
-    std::string table_name, std::shared_ptr<Transaction> transaction) {
+    std::string table_name, std::shared_ptr<TransactionContext> ctx) {
   ICEBERG_PRECHECK(!table_name.empty(), "Table name cannot be empty");
-  ICEBERG_PRECHECK(transaction != nullptr,
-                   "Cannot create FastAppend without a transaction");
+  ICEBERG_PRECHECK(ctx != nullptr, "Cannot create FastAppend without a 
context");
   return std::unique_ptr<FastAppend>(
-      new FastAppend(std::move(table_name), std::move(transaction)));
+      new FastAppend(std::move(table_name), std::move(ctx)));
 }
 
-FastAppend::FastAppend(std::string table_name, std::shared_ptr<Transaction> 
transaction)
-    : SnapshotUpdate(std::move(transaction)), 
table_name_(std::move(table_name)) {}
+FastAppend::FastAppend(std::string table_name, 
std::shared_ptr<TransactionContext> ctx)
+    : SnapshotUpdate(std::move(ctx)), table_name_(std::move(table_name)) {}
 
 FastAppend& FastAppend::AppendFile(const std::shared_ptr<DataFile>& file) {
   ICEBERG_BUILDER_CHECK(file != nullptr, "Invalid data file: null");
@@ -118,7 +117,7 @@ Result<std::vector<ManifestFile>> FastAppend::Apply(
   if (snapshot != nullptr) {
     auto cached_snapshot = SnapshotCache(snapshot.get());
     ICEBERG_ASSIGN_OR_RAISE(auto snapshot_manifests,
-                            
cached_snapshot.Manifests(transaction_->table()->io()));
+                            cached_snapshot.Manifests(ctx_->table->io()));
     manifests.insert(manifests.end(), snapshot_manifests.begin(),
                      snapshot_manifests.end());
   }
@@ -179,9 +178,8 @@ Result<ManifestFile> FastAppend::CopyManifest(const 
ManifestFile& manifest) {
   int64_t snapshot_id = SnapshotId();
 
   // Copy the manifest with the new snapshot ID.
-  return CopyAppendManifest(manifest, transaction_->table()->io(), schema, 
spec,
-                            snapshot_id, new_manifest_path, 
current.format_version,
-                            &summary_);
+  return CopyAppendManifest(manifest, ctx_->table->io(), schema, spec, 
snapshot_id,
+                            new_manifest_path, current.format_version, 
&summary_);
 }
 
 Result<std::vector<ManifestFile>> FastAppend::WriteNewManifests() {
diff --git a/src/iceberg/update/fast_append.h b/src/iceberg/update/fast_append.h
index 7f5cbb09..580fa472 100644
--- a/src/iceberg/update/fast_append.h
+++ b/src/iceberg/update/fast_append.h
@@ -47,7 +47,7 @@ class ICEBERG_EXPORT FastAppend : public SnapshotUpdate {
   /// \param transaction The transaction to use for this update
   /// \return A Result containing the FastAppend instance or an error
   static Result<std::unique_ptr<FastAppend>> Make(
-      std::string table_name, std::shared_ptr<Transaction> transaction);
+      std::string table_name, std::shared_ptr<TransactionContext> ctx);
 
   /// \brief Append a data file to this update.
   ///
@@ -76,7 +76,7 @@ class ICEBERG_EXPORT FastAppend : public SnapshotUpdate {
   bool CleanupAfterCommit() const override;
 
  private:
-  explicit FastAppend(std::string table_name, std::shared_ptr<Transaction> 
transaction);
+  explicit FastAppend(std::string table_name, 
std::shared_ptr<TransactionContext> ctx);
 
   /// \brief Get the partition spec by spec ID.
   Result<std::shared_ptr<PartitionSpec>> Spec(int32_t spec_id);
diff --git a/src/iceberg/update/pending_update.cc 
b/src/iceberg/update/pending_update.cc
index e55a93df..2d5247d2 100644
--- a/src/iceberg/update/pending_update.cc
+++ b/src/iceberg/update/pending_update.cc
@@ -20,20 +20,41 @@
 #include "iceberg/update/pending_update.h"
 
 #include "iceberg/transaction.h"
+#include "iceberg/util/macros.h"
 
 namespace iceberg {
 
-PendingUpdate::PendingUpdate(std::shared_ptr<Transaction> transaction)
-    : transaction_(std::move(transaction)) {}
+PendingUpdate::PendingUpdate(std::shared_ptr<TransactionContext> ctx)
+    : ctx_(std::move(ctx)) {}
 
 PendingUpdate::~PendingUpdate() = default;
 
-Status PendingUpdate::Commit() { return transaction_->Apply(*this); }
+Status PendingUpdate::Commit() {
+  if (!ctx_->transaction) {
+    // Table-created path: no transaction exists yet, create a temporary one.
+    ICEBERG_ASSIGN_OR_RAISE(auto txn, Transaction::Make(ctx_));
+    Status status = txn->Apply(*this);
+    if (status.has_value()) {
+      auto commit_result = txn->Commit();
+      if (!commit_result.has_value()) {
+        status = std::unexpected(commit_result.error());
+      }
+    }
+    std::ignore =
+        Finalize(status.has_value() ? std::nullopt : 
std::make_optional(status.error()));
+    return status;
+  }
+  auto txn = ctx_->transaction->lock();
+  if (!txn) {
+    return CommitFailed("Transaction has been destroyed");
+  }
+  return txn->Apply(*this);
+}
 
 Status PendingUpdate::Finalize([[maybe_unused]] std::optional<Error> 
commit_error) {
   return {};
 }
 
-const TableMetadata& PendingUpdate::base() const { return 
transaction_->current(); }
+const TableMetadata& PendingUpdate::base() const { return ctx_->current(); }
 
 }  // namespace iceberg
diff --git a/src/iceberg/update/pending_update.h 
b/src/iceberg/update/pending_update.h
index f44812a8..f67be18c 100644
--- a/src/iceberg/update/pending_update.h
+++ b/src/iceberg/update/pending_update.h
@@ -84,11 +84,11 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
   ~PendingUpdate() override;
 
  protected:
-  explicit PendingUpdate(std::shared_ptr<Transaction> transaction);
+  explicit PendingUpdate(std::shared_ptr<TransactionContext> ctx);
 
   const TableMetadata& base() const;
 
-  std::shared_ptr<Transaction> transaction_;
+  std::shared_ptr<TransactionContext> ctx_;
 };
 
 }  // namespace iceberg
diff --git a/src/iceberg/update/set_snapshot.cc 
b/src/iceberg/update/set_snapshot.cc
index 7258bc4d..79662890 100644
--- a/src/iceberg/update/set_snapshot.cc
+++ b/src/iceberg/update/set_snapshot.cc
@@ -34,14 +34,13 @@
 namespace iceberg {
 
 Result<std::shared_ptr<SetSnapshot>> SetSnapshot::Make(
-    std::shared_ptr<Transaction> transaction) {
-  ICEBERG_PRECHECK(transaction != nullptr,
-                   "Cannot create SetSnapshot without a transaction");
-  return std::shared_ptr<SetSnapshot>(new SetSnapshot(std::move(transaction)));
+    std::shared_ptr<TransactionContext> ctx) {
+  ICEBERG_PRECHECK(ctx != nullptr, "Cannot create SetSnapshot without a 
context");
+  return std::shared_ptr<SetSnapshot>(new SetSnapshot(std::move(ctx)));
 }
 
-SetSnapshot::SetSnapshot(std::shared_ptr<Transaction> transaction)
-    : PendingUpdate(std::move(transaction)) {}
+SetSnapshot::SetSnapshot(std::shared_ptr<TransactionContext> ctx)
+    : PendingUpdate(std::move(ctx)) {}
 
 SetSnapshot::~SetSnapshot() = default;
 
@@ -89,7 +88,7 @@ SetSnapshot& SetSnapshot::RollbackTo(int64_t snapshot_id) {
 Result<int64_t> SetSnapshot::Apply() {
   ICEBERG_RETURN_UNEXPECTED(CheckErrors());
 
-  const TableMetadata& base_metadata = transaction_->current();
+  const TableMetadata& base_metadata = ctx_->current();
 
   // If no target snapshot was configured, return current state (NOOP)
   if (!target_snapshot_id_.has_value()) {
diff --git a/src/iceberg/update/set_snapshot.h 
b/src/iceberg/update/set_snapshot.h
index 1ad39960..6aeb9265 100644
--- a/src/iceberg/update/set_snapshot.h
+++ b/src/iceberg/update/set_snapshot.h
@@ -37,7 +37,7 @@ namespace iceberg {
 class ICEBERG_EXPORT SetSnapshot : public PendingUpdate {
  public:
   static Result<std::shared_ptr<SetSnapshot>> Make(
-      std::shared_ptr<Transaction> transaction);
+      std::shared_ptr<TransactionContext> ctx);
 
   ~SetSnapshot() override;
 
@@ -56,7 +56,7 @@ class ICEBERG_EXPORT SetSnapshot : public PendingUpdate {
   Result<int64_t> Apply();
 
  private:
-  explicit SetSnapshot(std::shared_ptr<Transaction> transaction);
+  explicit SetSnapshot(std::shared_ptr<TransactionContext> ctx);
 
   /// \brief Find the latest snapshot whose timestamp is before the provided 
timestamp.
   ///
diff --git a/src/iceberg/update/snapshot_manager.cc 
b/src/iceberg/update/snapshot_manager.cc
index d882dd32..55f73072 100644
--- a/src/iceberg/update/snapshot_manager.cc
+++ b/src/iceberg/update/snapshot_manager.cc
@@ -34,8 +34,7 @@ Result<std::shared_ptr<SnapshotManager>> 
SnapshotManager::Make(
     std::shared_ptr<Table> table) {
   ICEBERG_PRECHECK(table != nullptr, "Invalid input table: null");
   ICEBERG_ASSIGN_OR_RAISE(auto transaction,
-                          Transaction::Make(std::move(table), 
Transaction::Kind::kUpdate,
-                                            /*auto_commit=*/false));
+                          Transaction::Make(std::move(table), 
TransactionKind::kUpdate));
   return std::shared_ptr<SnapshotManager>(
       new SnapshotManager(std::move(transaction), 
/*is_external_transaction=*/false));
 }
diff --git a/src/iceberg/update/snapshot_update.cc 
b/src/iceberg/update/snapshot_update.cc
index b4468256..3e579266 100644
--- a/src/iceberg/update/snapshot_update.cc
+++ b/src/iceberg/update/snapshot_update.cc
@@ -154,8 +154,8 @@ Result<ManifestFile> AddMetadata(const ManifestFile& 
manifest, std::shared_ptr<F
 
 SnapshotUpdate::~SnapshotUpdate() = default;
 
-SnapshotUpdate::SnapshotUpdate(std::shared_ptr<Transaction> transaction)
-    : PendingUpdate(std::move(transaction)),
+SnapshotUpdate::SnapshotUpdate(std::shared_ptr<TransactionContext> ctx)
+    : PendingUpdate(std::move(ctx)),
       can_inherit_snapshot_id_(
           base().format_version > 1 ||
           
base().properties.Get(TableProperties::kSnapshotIdInheritanceEnabled)),
@@ -176,11 +176,10 @@ Result<std::vector<ManifestFile>> 
SnapshotUpdate::WriteDataManifests(
   RollingManifestWriter rolling_writer(
       [this, spec, schema = std::move(current_schema),
        snapshot_id = SnapshotId()]() -> 
Result<std::unique_ptr<ManifestWriter>> {
-        return ManifestWriter::MakeWriter(base().format_version, snapshot_id,
-                                          ManifestPath(), 
transaction_->table()->io(),
-                                          std::move(spec), std::move(schema),
-                                          ManifestContent::kData,
-                                          /*first_row_id=*/base().next_row_id);
+        return ManifestWriter::MakeWriter(
+            base().format_version, snapshot_id, ManifestPath(), 
ctx_->table->io(),
+            std::move(spec), std::move(schema), ManifestContent::kData,
+            /*first_row_id=*/base().next_row_id);
       },
       target_manifest_size_bytes_);
 
@@ -203,10 +202,9 @@ Result<std::vector<ManifestFile>> 
SnapshotUpdate::WriteDeleteManifests(
   RollingManifestWriter rolling_writer(
       [this, spec, schema = std::move(current_schema),
        snapshot_id = SnapshotId()]() -> 
Result<std::unique_ptr<ManifestWriter>> {
-        return ManifestWriter::MakeWriter(base().format_version, snapshot_id,
-                                          ManifestPath(), 
transaction_->table()->io(),
-                                          std::move(spec), std::move(schema),
-                                          ManifestContent::kDeletes);
+        return ManifestWriter::MakeWriter(
+            base().format_version, snapshot_id, ManifestPath(), 
ctx_->table->io(),
+            std::move(spec), std::move(schema), ManifestContent::kDeletes);
       },
       target_manifest_size_bytes_);
 
@@ -245,8 +243,7 @@ Result<SnapshotUpdate::ApplyResult> SnapshotUpdate::Apply() 
{
       continue;
     }
     // TODO(xxx): read in parallel and cache enriched manifests for retries
-    ICEBERG_ASSIGN_OR_RAISE(manifest,
-                            AddMetadata(manifest, transaction_->table()->io(), 
base()));
+    ICEBERG_ASSIGN_OR_RAISE(manifest, AddMetadata(manifest, ctx_->table->io(), 
base()));
   }
 
   std::string manifest_list_path = ManifestListPath();
@@ -254,8 +251,8 @@ Result<SnapshotUpdate::ApplyResult> SnapshotUpdate::Apply() 
{
   ICEBERG_ASSIGN_OR_RAISE(
       auto writer, ManifestListWriter::MakeWriter(base().format_version, 
SnapshotId(),
                                                   parent_snapshot_id, 
manifest_list_path,
-                                                  transaction_->table()->io(),
-                                                  sequence_number, 
base().next_row_id));
+                                                  ctx_->table->io(), 
sequence_number,
+                                                  base().next_row_id));
   ICEBERG_RETURN_UNEXPECTED(writer->AddAll(manifests));
   ICEBERG_RETURN_UNEXPECTED(writer->Close());
 
@@ -313,8 +310,7 @@ Status SnapshotUpdate::Finalize(std::optional<Error> 
commit_error) {
     ICEBERG_CHECK(staged_snapshot_ != nullptr,
                   "Staged snapshot is null during finalize after commit");
     auto cached_snapshot = SnapshotCache(staged_snapshot_.get());
-    ICEBERG_ASSIGN_OR_RAISE(auto manifests,
-                            
cached_snapshot.Manifests(transaction_->table()->io()));
+    ICEBERG_ASSIGN_OR_RAISE(auto manifests, 
cached_snapshot.Manifests(ctx_->table->io()));
     CleanUncommitted(manifests | std::views::transform([](const auto& 
manifest) {
                        return manifest.manifest_path;
                      }) |
@@ -391,7 +387,7 @@ void SnapshotUpdate::CleanAll() {
 
 Status SnapshotUpdate::DeleteFile(const std::string& path) {
   static const auto kDefaultDeleteFunc = [this](const std::string& path) {
-    return this->transaction_->table()->io()->DeleteFile(path);
+    return this->ctx_->table->io()->DeleteFile(path);
   };
   if (delete_func_) {
     return delete_func_(path);
@@ -406,14 +402,14 @@ std::string SnapshotUpdate::ManifestListPath() {
   int64_t snapshot_id = SnapshotId();
   std::string filename =
       std::format("snap-{}-{}-{}.avro", snapshot_id, ++attempt_, commit_uuid_);
-  return transaction_->MetadataFileLocation(filename);
+  return ctx_->MetadataFileLocation(filename);
 }
 
 std::string SnapshotUpdate::ManifestPath() {
   // Generate manifest path
   // Format: {metadata_location}/{uuid}-m{manifest_count}.avro
   std::string filename = std::format("{}-m{}.avro", commit_uuid_, 
manifest_count_++);
-  return transaction_->MetadataFileLocation(filename);
+  return ctx_->MetadataFileLocation(filename);
 }
 
 }  // namespace iceberg
diff --git a/src/iceberg/update/snapshot_update.h 
b/src/iceberg/update/snapshot_update.h
index fdbb2660..284d1d2d 100644
--- a/src/iceberg/update/snapshot_update.h
+++ b/src/iceberg/update/snapshot_update.h
@@ -121,7 +121,7 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate {
   Status Finalize(std::optional<Error> commit_error) override;
 
  protected:
-  explicit SnapshotUpdate(std::shared_ptr<Transaction> transaction);
+  explicit SnapshotUpdate(std::shared_ptr<TransactionContext> ctx);
 
   /// \brief Write data manifests for the given data files
   ///
diff --git a/src/iceberg/update/update_location.cc 
b/src/iceberg/update/update_location.cc
index c82a138f..064bebc8 100644
--- a/src/iceberg/update/update_location.cc
+++ b/src/iceberg/update/update_location.cc
@@ -30,14 +30,13 @@
 namespace iceberg {
 
 Result<std::shared_ptr<UpdateLocation>> UpdateLocation::Make(
-    std::shared_ptr<Transaction> transaction) {
-  ICEBERG_PRECHECK(transaction != nullptr,
-                   "Cannot create UpdateLocation without a transaction");
-  return std::shared_ptr<UpdateLocation>(new 
UpdateLocation(std::move(transaction)));
+    std::shared_ptr<TransactionContext> ctx) {
+  ICEBERG_PRECHECK(ctx != nullptr, "Cannot create UpdateLocation without a 
context");
+  return std::shared_ptr<UpdateLocation>(new UpdateLocation(std::move(ctx)));
 }
 
-UpdateLocation::UpdateLocation(std::shared_ptr<Transaction> transaction)
-    : PendingUpdate(std::move(transaction)) {}
+UpdateLocation::UpdateLocation(std::shared_ptr<TransactionContext> ctx)
+    : PendingUpdate(std::move(ctx)) {}
 
 UpdateLocation::~UpdateLocation() = default;
 
diff --git a/src/iceberg/update/update_location.h 
b/src/iceberg/update/update_location.h
index 891853e9..33864380 100644
--- a/src/iceberg/update/update_location.h
+++ b/src/iceberg/update/update_location.h
@@ -34,7 +34,7 @@ namespace iceberg {
 class ICEBERG_EXPORT UpdateLocation : public PendingUpdate {
  public:
   static Result<std::shared_ptr<UpdateLocation>> Make(
-      std::shared_ptr<Transaction> transaction);
+      std::shared_ptr<TransactionContext> ctx);
 
   ~UpdateLocation() override;
 
@@ -50,7 +50,7 @@ class ICEBERG_EXPORT UpdateLocation : public PendingUpdate {
   Result<std::string> Apply();
 
  private:
-  explicit UpdateLocation(std::shared_ptr<Transaction> transaction);
+  explicit UpdateLocation(std::shared_ptr<TransactionContext> ctx);
 
   std::string location_;
 };
diff --git a/src/iceberg/update/update_partition_spec.cc 
b/src/iceberg/update/update_partition_spec.cc
index 812540f2..2be6a59a 100644
--- a/src/iceberg/update/update_partition_spec.cc
+++ b/src/iceberg/update/update_partition_spec.cc
@@ -37,15 +37,13 @@
 namespace iceberg {
 
 Result<std::shared_ptr<UpdatePartitionSpec>> UpdatePartitionSpec::Make(
-    std::shared_ptr<Transaction> transaction) {
-  ICEBERG_PRECHECK(transaction != nullptr,
-                   "Cannot create UpdatePartitionSpec without transaction");
-  return std::shared_ptr<UpdatePartitionSpec>(
-      new UpdatePartitionSpec(std::move(transaction)));
+    std::shared_ptr<TransactionContext> ctx) {
+  ICEBERG_PRECHECK(ctx != nullptr, "Cannot create UpdatePartitionSpec without 
context");
+  return std::shared_ptr<UpdatePartitionSpec>(new 
UpdatePartitionSpec(std::move(ctx)));
 }
 
-UpdatePartitionSpec::UpdatePartitionSpec(std::shared_ptr<Transaction> 
transaction)
-    : PendingUpdate(std::move(transaction)) {
+UpdatePartitionSpec::UpdatePartitionSpec(std::shared_ptr<TransactionContext> 
ctx)
+    : PendingUpdate(std::move(ctx)) {
   format_version_ = base().format_version;
 
   // Get the current/default partition spec
diff --git a/src/iceberg/update/update_partition_spec.h 
b/src/iceberg/update/update_partition_spec.h
index 1eab425d..6b3dd40e 100644
--- a/src/iceberg/update/update_partition_spec.h
+++ b/src/iceberg/update/update_partition_spec.h
@@ -44,7 +44,7 @@ namespace iceberg {
 class ICEBERG_EXPORT UpdatePartitionSpec : public PendingUpdate {
  public:
   static Result<std::shared_ptr<UpdatePartitionSpec>> Make(
-      std::shared_ptr<Transaction> transaction);
+      std::shared_ptr<TransactionContext> ctx);
 
   ~UpdatePartitionSpec() override;
 
@@ -107,7 +107,7 @@ class ICEBERG_EXPORT UpdatePartitionSpec : public 
PendingUpdate {
   Result<ApplyResult> Apply();
 
  private:
-  explicit UpdatePartitionSpec(std::shared_ptr<Transaction> transaction);
+  explicit UpdatePartitionSpec(std::shared_ptr<TransactionContext> ctx);
 
   /// \brief Pair of source ID and transform string for indexing.
   using TransformKey = std::pair<int32_t, std::string>;
diff --git a/src/iceberg/update/update_partition_statistics.cc 
b/src/iceberg/update/update_partition_statistics.cc
index 2c06c0ce..3a5ab4f8 100644
--- a/src/iceberg/update/update_partition_statistics.cc
+++ b/src/iceberg/update/update_partition_statistics.cc
@@ -32,16 +32,16 @@
 namespace iceberg {
 
 Result<std::shared_ptr<UpdatePartitionStatistics>> 
UpdatePartitionStatistics::Make(
-    std::shared_ptr<Transaction> transaction) {
-  ICEBERG_PRECHECK(transaction != nullptr,
-                   "Cannot create UpdatePartitionStatistics without a 
transaction");
+    std::shared_ptr<TransactionContext> ctx) {
+  ICEBERG_PRECHECK(ctx != nullptr,
+                   "Cannot create UpdatePartitionStatistics without a 
context");
   return std::shared_ptr<UpdatePartitionStatistics>(
-      new UpdatePartitionStatistics(std::move(transaction)));
+      new UpdatePartitionStatistics(std::move(ctx)));
 }
 
 UpdatePartitionStatistics::UpdatePartitionStatistics(
-    std::shared_ptr<Transaction> transaction)
-    : PendingUpdate(std::move(transaction)) {}
+    std::shared_ptr<TransactionContext> ctx)
+    : PendingUpdate(std::move(ctx)) {}
 
 UpdatePartitionStatistics::~UpdatePartitionStatistics() = default;
 
diff --git a/src/iceberg/update/update_partition_statistics.h 
b/src/iceberg/update/update_partition_statistics.h
index 740fe214..bdaf5a70 100644
--- a/src/iceberg/update/update_partition_statistics.h
+++ b/src/iceberg/update/update_partition_statistics.h
@@ -39,7 +39,7 @@ namespace iceberg {
 class ICEBERG_EXPORT UpdatePartitionStatistics : public PendingUpdate {
  public:
   static Result<std::shared_ptr<UpdatePartitionStatistics>> Make(
-      std::shared_ptr<Transaction> transaction);
+      std::shared_ptr<TransactionContext> ctx);
 
   ~UpdatePartitionStatistics() override;
 
@@ -71,7 +71,7 @@ class ICEBERG_EXPORT UpdatePartitionStatistics : public 
PendingUpdate {
   Result<ApplyResult> Apply();
 
  private:
-  explicit UpdatePartitionStatistics(std::shared_ptr<Transaction> transaction);
+  explicit UpdatePartitionStatistics(std::shared_ptr<TransactionContext> ctx);
 
   std::unordered_map<int64_t, std::shared_ptr<PartitionStatisticsFile>>
       partition_statistics_to_set_;
diff --git a/src/iceberg/update/update_properties.cc 
b/src/iceberg/update/update_properties.cc
index e6fe7a60..1837ab00 100644
--- a/src/iceberg/update/update_properties.cc
+++ b/src/iceberg/update/update_properties.cc
@@ -34,14 +34,13 @@
 namespace iceberg {
 
 Result<std::shared_ptr<UpdateProperties>> UpdateProperties::Make(
-    std::shared_ptr<Transaction> transaction) {
-  ICEBERG_PRECHECK(transaction != nullptr,
-                   "Cannot create UpdateProperties without a transaction");
-  return std::shared_ptr<UpdateProperties>(new 
UpdateProperties(std::move(transaction)));
+    std::shared_ptr<TransactionContext> ctx) {
+  ICEBERG_PRECHECK(ctx != nullptr, "Cannot create UpdateProperties without a 
context");
+  return std::shared_ptr<UpdateProperties>(new 
UpdateProperties(std::move(ctx)));
 }
 
-UpdateProperties::UpdateProperties(std::shared_ptr<Transaction> transaction)
-    : PendingUpdate(std::move(transaction)) {}
+UpdateProperties::UpdateProperties(std::shared_ptr<TransactionContext> ctx)
+    : PendingUpdate(std::move(ctx)) {}
 
 UpdateProperties::~UpdateProperties() = default;
 
diff --git a/src/iceberg/update/update_properties.h 
b/src/iceberg/update/update_properties.h
index ec9ab796..491a5567 100644
--- a/src/iceberg/update/update_properties.h
+++ b/src/iceberg/update/update_properties.h
@@ -39,7 +39,7 @@ namespace iceberg {
 class ICEBERG_EXPORT UpdateProperties : public PendingUpdate {
  public:
   static Result<std::shared_ptr<UpdateProperties>> Make(
-      std::shared_ptr<Transaction> transaction);
+      std::shared_ptr<TransactionContext> ctx);
 
   ~UpdateProperties() override;
 
@@ -71,7 +71,7 @@ class ICEBERG_EXPORT UpdateProperties : public PendingUpdate {
   Result<ApplyResult> Apply();
 
  private:
-  explicit UpdateProperties(std::shared_ptr<Transaction> transaction);
+  explicit UpdateProperties(std::shared_ptr<TransactionContext> ctx);
 
   std::unordered_map<std::string, std::string> updates_;
   std::unordered_set<std::string> removals_;
diff --git a/src/iceberg/update/update_schema.cc 
b/src/iceberg/update/update_schema.cc
index 3fdce409..1f35781f 100644
--- a/src/iceberg/update/update_schema.cc
+++ b/src/iceberg/update/update_schema.cc
@@ -279,15 +279,14 @@ std::vector<SchemaField> ApplyChangesVisitor::MoveFields(
 }  // namespace
 
 Result<std::shared_ptr<UpdateSchema>> UpdateSchema::Make(
-    std::shared_ptr<Transaction> transaction) {
-  ICEBERG_PRECHECK(transaction != nullptr,
-                   "Cannot create UpdateSchema without transaction");
-  return std::shared_ptr<UpdateSchema>(new 
UpdateSchema(std::move(transaction)));
+    std::shared_ptr<TransactionContext> ctx) {
+  ICEBERG_PRECHECK(ctx != nullptr, "Cannot create UpdateSchema without 
context");
+  return std::shared_ptr<UpdateSchema>(new UpdateSchema(std::move(ctx)));
 }
 
-UpdateSchema::UpdateSchema(std::shared_ptr<Transaction> transaction)
-    : PendingUpdate(std::move(transaction)) {
-  const TableMetadata& base_metadata = transaction_->current();
+UpdateSchema::UpdateSchema(std::shared_ptr<TransactionContext> ctx)
+    : PendingUpdate(std::move(ctx)) {
+  const TableMetadata& base_metadata = ctx_->current();
 
   auto schema_result = base_metadata.Schema();
   if (!schema_result.has_value()) {
diff --git a/src/iceberg/update/update_schema.h 
b/src/iceberg/update/update_schema.h
index 2223c0b8..564a03df 100644
--- a/src/iceberg/update/update_schema.h
+++ b/src/iceberg/update/update_schema.h
@@ -48,7 +48,7 @@ namespace iceberg {
 class ICEBERG_EXPORT UpdateSchema : public PendingUpdate {
  public:
   static Result<std::shared_ptr<UpdateSchema>> Make(
-      std::shared_ptr<Transaction> transaction);
+      std::shared_ptr<TransactionContext> ctx);
 
   ~UpdateSchema() override;
 
@@ -348,7 +348,7 @@ class ICEBERG_EXPORT UpdateSchema : public PendingUpdate {
   Result<ApplyResult> Apply();
 
  private:
-  explicit UpdateSchema(std::shared_ptr<Transaction> transaction);
+  explicit UpdateSchema(std::shared_ptr<TransactionContext> ctx);
 
   /// \brief Internal implementation for adding a column with full control.
   ///
diff --git a/src/iceberg/update/update_snapshot_reference.cc 
b/src/iceberg/update/update_snapshot_reference.cc
index 923f0c8d..908962ec 100644
--- a/src/iceberg/update/update_snapshot_reference.cc
+++ b/src/iceberg/update/update_snapshot_reference.cc
@@ -33,15 +33,15 @@
 namespace iceberg {
 
 Result<std::shared_ptr<UpdateSnapshotReference>> UpdateSnapshotReference::Make(
-    std::shared_ptr<Transaction> transaction) {
-  ICEBERG_PRECHECK(transaction != nullptr,
-                   "Cannot create UpdateSnapshotReference without a 
transaction");
+    std::shared_ptr<TransactionContext> ctx) {
+  ICEBERG_PRECHECK(ctx != nullptr,
+                   "Cannot create UpdateSnapshotReference without a context");
   return std::shared_ptr<UpdateSnapshotReference>(
-      new UpdateSnapshotReference(std::move(transaction)));
+      new UpdateSnapshotReference(std::move(ctx)));
 }
 
-UpdateSnapshotReference::UpdateSnapshotReference(std::shared_ptr<Transaction> 
transaction)
-    : PendingUpdate(std::move(transaction)), updated_refs_(base().refs) {}
+UpdateSnapshotReference::UpdateSnapshotReference(std::shared_ptr<TransactionContext>
 ctx)
+    : PendingUpdate(std::move(ctx)), updated_refs_(base().refs) {}
 
 UpdateSnapshotReference::~UpdateSnapshotReference() = default;
 
@@ -143,7 +143,7 @@ UpdateSnapshotReference& 
UpdateSnapshotReference::ReplaceBranchInternal(
   if (fast_forward) {
     // Fast-forward is valid only when the current branch (from) is an 
ancestor of the
     // target (to), i.e. we are moving forward in history.
-    const auto& base_metadata = transaction_->current();
+    const auto& base_metadata = ctx_->current();
     ICEBERG_BUILDER_ASSIGN_OR_RETURN(
         auto from_is_ancestor_of_to,
         SnapshotUtil::IsAncestorOf(
diff --git a/src/iceberg/update/update_snapshot_reference.h 
b/src/iceberg/update/update_snapshot_reference.h
index e13f5bfa..7d061ea3 100644
--- a/src/iceberg/update/update_snapshot_reference.h
+++ b/src/iceberg/update/update_snapshot_reference.h
@@ -39,7 +39,7 @@ namespace iceberg {
 class ICEBERG_EXPORT UpdateSnapshotReference : public PendingUpdate {
  public:
   static Result<std::shared_ptr<UpdateSnapshotReference>> Make(
-      std::shared_ptr<Transaction> transaction);
+      std::shared_ptr<TransactionContext> ctx);
 
   ~UpdateSnapshotReference() override;
 
@@ -145,7 +145,7 @@ class ICEBERG_EXPORT UpdateSnapshotReference : public 
PendingUpdate {
   Result<ApplyResult> Apply();
 
  private:
-  explicit UpdateSnapshotReference(std::shared_ptr<Transaction> transaction);
+  explicit UpdateSnapshotReference(std::shared_ptr<TransactionContext> ctx);
 
   UpdateSnapshotReference& ReplaceBranchInternal(const std::string& from,
                                                  const std::string& to,
diff --git a/src/iceberg/update/update_sort_order.cc 
b/src/iceberg/update/update_sort_order.cc
index c5c7be32..8086b903 100644
--- a/src/iceberg/update/update_sort_order.cc
+++ b/src/iceberg/update/update_sort_order.cc
@@ -34,14 +34,13 @@
 namespace iceberg {
 
 Result<std::shared_ptr<UpdateSortOrder>> UpdateSortOrder::Make(
-    std::shared_ptr<Transaction> transaction) {
-  ICEBERG_PRECHECK(transaction != nullptr,
-                   "Cannot create UpdateSortOrder without a transaction");
-  return std::shared_ptr<UpdateSortOrder>(new 
UpdateSortOrder(std::move(transaction)));
+    std::shared_ptr<TransactionContext> ctx) {
+  ICEBERG_PRECHECK(ctx != nullptr, "Cannot create UpdateSortOrder without a 
context");
+  return std::shared_ptr<UpdateSortOrder>(new UpdateSortOrder(std::move(ctx)));
 }
 
-UpdateSortOrder::UpdateSortOrder(std::shared_ptr<Transaction> transaction)
-    : PendingUpdate(std::move(transaction)) {}
+UpdateSortOrder::UpdateSortOrder(std::shared_ptr<TransactionContext> ctx)
+    : PendingUpdate(std::move(ctx)) {}
 
 UpdateSortOrder::~UpdateSortOrder() = default;
 
diff --git a/src/iceberg/update/update_sort_order.h 
b/src/iceberg/update/update_sort_order.h
index 364a70f6..53fe927e 100644
--- a/src/iceberg/update/update_sort_order.h
+++ b/src/iceberg/update/update_sort_order.h
@@ -37,7 +37,7 @@ namespace iceberg {
 class ICEBERG_EXPORT UpdateSortOrder : public PendingUpdate {
  public:
   static Result<std::shared_ptr<UpdateSortOrder>> Make(
-      std::shared_ptr<Transaction> transaction);
+      std::shared_ptr<TransactionContext> ctx);
 
   ~UpdateSortOrder() override;
 
@@ -71,7 +71,7 @@ class ICEBERG_EXPORT UpdateSortOrder : public PendingUpdate {
   Result<std::shared_ptr<SortOrder>> Apply();
 
  private:
-  explicit UpdateSortOrder(std::shared_ptr<Transaction> transaction);
+  explicit UpdateSortOrder(std::shared_ptr<TransactionContext> ctx);
 
   std::vector<SortField> sort_fields_;
   bool case_sensitive_ = true;
diff --git a/src/iceberg/update/update_statistics.cc 
b/src/iceberg/update/update_statistics.cc
index 46145336..afa6ea0c 100644
--- a/src/iceberg/update/update_statistics.cc
+++ b/src/iceberg/update/update_statistics.cc
@@ -32,14 +32,13 @@
 namespace iceberg {
 
 Result<std::shared_ptr<UpdateStatistics>> UpdateStatistics::Make(
-    std::shared_ptr<Transaction> transaction) {
-  ICEBERG_PRECHECK(transaction != nullptr,
-                   "Cannot create UpdateStatistics without a transaction");
-  return std::shared_ptr<UpdateStatistics>(new 
UpdateStatistics(std::move(transaction)));
+    std::shared_ptr<TransactionContext> ctx) {
+  ICEBERG_PRECHECK(ctx != nullptr, "Cannot create UpdateStatistics without a 
context");
+  return std::shared_ptr<UpdateStatistics>(new 
UpdateStatistics(std::move(ctx)));
 }
 
-UpdateStatistics::UpdateStatistics(std::shared_ptr<Transaction> transaction)
-    : PendingUpdate(std::move(transaction)) {}
+UpdateStatistics::UpdateStatistics(std::shared_ptr<TransactionContext> ctx)
+    : PendingUpdate(std::move(ctx)) {}
 
 UpdateStatistics::~UpdateStatistics() = default;
 
diff --git a/src/iceberg/update/update_statistics.h 
b/src/iceberg/update/update_statistics.h
index 55e50fb1..6441c02a 100644
--- a/src/iceberg/update/update_statistics.h
+++ b/src/iceberg/update/update_statistics.h
@@ -39,7 +39,7 @@ namespace iceberg {
 class ICEBERG_EXPORT UpdateStatistics : public PendingUpdate {
  public:
   static Result<std::shared_ptr<UpdateStatistics>> Make(
-      std::shared_ptr<Transaction> transaction);
+      std::shared_ptr<TransactionContext> ctx);
 
   ~UpdateStatistics() override;
 
@@ -70,7 +70,7 @@ class ICEBERG_EXPORT UpdateStatistics : public PendingUpdate {
   Result<ApplyResult> Apply();
 
  private:
-  explicit UpdateStatistics(std::shared_ptr<Transaction> transaction);
+  explicit UpdateStatistics(std::shared_ptr<TransactionContext> ctx);
 
   std::unordered_map<int64_t, std::shared_ptr<StatisticsFile>> 
statistics_to_set_;
 };

Reply via email to