This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new da916268 feat: implement transaction api (#418)
da916268 is described below
commit da9162682245bb0114fe67f9d789919d37eccd19
Author: Gang Wu <[email protected]>
AuthorDate: Fri Dec 19 13:22:57 2025 +0800
feat: implement transaction api (#418)
- Introduce `Transaction` class to manage multi-operation table updates.
- Add `Table::NewTransaction()` and `Table::NewUpdateProperties()` to
initiate updates.
- Move `PendingUpdate` and `UpdateProperties` to `src/iceberg/update/`
and refactor them to use the transaction mechanism.
- Add `StagedTable` to represent tables with uncommitted changes.
- Update `InMemoryCatalog` and `RestCatalog` to align with the new
update flow.
- Refactor `Table` to support metadata refresh and location management
within transactions.
- Add comprehensive tests for transactions and property updates.
---
example/demo_example.cc | 8 +-
src/iceberg/CMakeLists.txt | 2 +
src/iceberg/catalog.h | 6 +-
src/iceberg/catalog/memory/in_memory_catalog.cc | 21 +--
src/iceberg/catalog/memory/in_memory_catalog.h | 6 +-
src/iceberg/catalog/rest/rest_catalog.cc | 6 +-
src/iceberg/catalog/rest/rest_catalog.h | 6 +-
src/iceberg/meson.build | 5 +-
src/iceberg/pending_update.h | 72 --------
src/iceberg/table.cc | 102 +++++++++--
src/iceberg/table.h | 96 +++++++---
src/iceberg/table_metadata.cc | 8 +
src/iceberg/table_metadata.h | 9 +
src/iceberg/table_update.cc | 6 +-
src/iceberg/table_update.h | 59 ++++++-
src/iceberg/test/CMakeLists.txt | 29 +--
src/iceberg/test/in_memory_catalog_test.cc | 52 +++---
src/iceberg/test/meson.build | 6 +-
src/iceberg/test/mock_catalog.h | 6 +-
src/iceberg/test/mock_io.h | 42 +++++
src/iceberg/test/table_test.cc | 196 +++++++++++---------
src/iceberg/test/transaction_test.cc | 90 ++++++++++
src/iceberg/test/update_properties_test.cc | 226 +++++++++---------------
src/iceberg/test/update_test_base.h | 75 ++++++++
src/iceberg/transaction.cc | 111 ++++++++++++
src/iceberg/transaction.h | 66 +++++--
src/iceberg/type_fwd.h | 87 +++++----
src/iceberg/update/meson.build | 21 +++
src/iceberg/update/pending_update.cc | 38 ++++
src/iceberg/update/pending_update.h | 87 +++++++++
src/iceberg/update/update_properties.cc | 84 ++++-----
src/iceberg/update/update_properties.h | 35 +---
32 files changed, 1123 insertions(+), 540 deletions(-)
diff --git a/example/demo_example.cc b/example/demo_example.cc
index c333c797..ab011fee 100644
--- a/example/demo_example.cc
+++ b/example/demo_example.cc
@@ -58,7 +58,13 @@ int main(int argc, char** argv) {
}
auto table = std::move(load_result.value());
- auto scan_result = table->NewScan()->Build();
+ auto scan_builder = table->NewScan();
+ if (!scan_builder.has_value()) {
+ std::cerr << "Failed to create scan builder: " <<
scan_builder.error().message
+ << std::endl;
+ return 1;
+ }
+ auto scan_result = scan_builder.value()->Build();
if (!scan_result.has_value()) {
std::cerr << "Failed to build scan: " << scan_result.error().message <<
std::endl;
return 1;
diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index a0d93967..9c25015c 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -72,9 +72,11 @@ set(ICEBERG_SOURCES
table_requirements.cc
table_scan.cc
table_update.cc
+ transaction.cc
transform.cc
transform_function.cc
type.cc
+ update/pending_update.cc
update/update_properties.cc
util/bucket_util.cc
util/conversions.cc
diff --git a/src/iceberg/catalog.h b/src/iceberg/catalog.h
index 6c4957ad..08965df8 100644
--- a/src/iceberg/catalog.h
+++ b/src/iceberg/catalog.h
@@ -110,7 +110,7 @@ class ICEBERG_EXPORT Catalog {
/// \param location a location for the table; leave empty if unspecified
/// \param properties a string map of table properties
/// \return a Table instance or ErrorKind::kAlreadyExists if the table
already exists
- virtual Result<std::unique_ptr<Table>> CreateTable(
+ virtual Result<std::shared_ptr<Table>> CreateTable(
const TableIdentifier& identifier, const Schema& schema, const
PartitionSpec& spec,
const std::string& location,
const std::unordered_map<std::string, std::string>& properties) = 0;
@@ -121,7 +121,7 @@ class ICEBERG_EXPORT Catalog {
/// \param requirements a list of table requirements
/// \param updates a list of table updates
/// \return a Table instance or ErrorKind::kAlreadyExists if the table
already exists
- virtual Result<std::unique_ptr<Table>> UpdateTable(
+ virtual Result<std::shared_ptr<Table>> UpdateTable(
const TableIdentifier& identifier,
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
const std::vector<std::unique_ptr<TableUpdate>>& updates) = 0;
@@ -175,7 +175,7 @@ class ICEBERG_EXPORT Catalog {
/// \param identifier a table identifier
/// \return instance of Table implementation referred to by identifier or
/// ErrorKind::kNoSuchTable if the table does not exist
- virtual Result<std::unique_ptr<Table>> LoadTable(const TableIdentifier&
identifier) = 0;
+ virtual Result<std::shared_ptr<Table>> LoadTable(const TableIdentifier&
identifier) = 0;
/// \brief Register a table with the catalog if it does not exist
///
diff --git a/src/iceberg/catalog/memory/in_memory_catalog.cc
b/src/iceberg/catalog/memory/in_memory_catalog.cc
index dc6d9d00..a0c143c5 100644
--- a/src/iceberg/catalog/memory/in_memory_catalog.cc
+++ b/src/iceberg/catalog/memory/in_memory_catalog.cc
@@ -399,7 +399,7 @@ Result<std::vector<TableIdentifier>>
InMemoryCatalog::ListTables(
return table_idents;
}
-Result<std::unique_ptr<Table>> InMemoryCatalog::CreateTable(
+Result<std::shared_ptr<Table>> InMemoryCatalog::CreateTable(
const TableIdentifier& identifier, const Schema& schema, const
PartitionSpec& spec,
const std::string& location,
const std::unordered_map<std::string, std::string>& properties) {
@@ -407,7 +407,7 @@ Result<std::unique_ptr<Table>> InMemoryCatalog::CreateTable(
return NotImplemented("create table");
}
-Result<std::unique_ptr<Table>> InMemoryCatalog::UpdateTable(
+Result<std::shared_ptr<Table>> InMemoryCatalog::UpdateTable(
const TableIdentifier& identifier,
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
const std::vector<std::unique_ptr<TableUpdate>>& updates) {
@@ -434,9 +434,8 @@ Result<std::unique_ptr<Table>> InMemoryCatalog::UpdateTable(
root_namespace_->UpdateTableMetadataLocation(identifier,
new_metadata_location));
TableMetadataUtil::DeleteRemovedMetadataFiles(*file_io_, base.get(),
*updated);
- return std::make_unique<Table>(identifier, std::move(updated),
- std::move(new_metadata_location), file_io_,
-
std::static_pointer_cast<Catalog>(shared_from_this()));
+ return Table::Make(identifier, std::move(updated),
std::move(new_metadata_location),
+ file_io_, shared_from_this());
}
Result<std::shared_ptr<Transaction>> InMemoryCatalog::StageCreateTable(
@@ -464,7 +463,7 @@ Status InMemoryCatalog::RenameTable(const TableIdentifier&
from,
return NotImplemented("rename table");
}
-Result<std::unique_ptr<Table>> InMemoryCatalog::LoadTable(
+Result<std::shared_ptr<Table>> InMemoryCatalog::LoadTable(
const TableIdentifier& identifier) {
if (!file_io_) [[unlikely]] {
return InvalidArgument("file_io is not set for catalog {}", catalog_name_);
@@ -479,9 +478,8 @@ Result<std::unique_ptr<Table>> InMemoryCatalog::LoadTable(
ICEBERG_ASSIGN_OR_RAISE(auto metadata,
TableMetadataUtil::Read(*file_io_,
metadata_location));
- return std::make_unique<Table>(identifier, std::move(metadata),
- std::move(metadata_location), file_io_,
-
std::static_pointer_cast<Catalog>(shared_from_this()));
+ return Table::Make(identifier, std::move(metadata),
std::move(metadata_location),
+ file_io_, shared_from_this());
}
Result<std::shared_ptr<Table>> InMemoryCatalog::RegisterTable(
@@ -500,9 +498,8 @@ Result<std::shared_ptr<Table>>
InMemoryCatalog::RegisterTable(
if (!root_namespace_->RegisterTable(identifier, metadata_file_location)) {
return UnknownError("The registry failed.");
}
- return std::make_unique<Table>(identifier, std::move(metadata),
metadata_file_location,
- file_io_,
-
std::static_pointer_cast<Catalog>(shared_from_this()));
+ return Table::Make(identifier, std::move(metadata), metadata_file_location,
file_io_,
+ shared_from_this());
}
} // namespace iceberg
diff --git a/src/iceberg/catalog/memory/in_memory_catalog.h
b/src/iceberg/catalog/memory/in_memory_catalog.h
index e6a9acbc..dd72dd89 100644
--- a/src/iceberg/catalog/memory/in_memory_catalog.h
+++ b/src/iceberg/catalog/memory/in_memory_catalog.h
@@ -70,12 +70,12 @@ class ICEBERG_EXPORT InMemoryCatalog
Result<std::vector<TableIdentifier>> ListTables(const Namespace& ns) const
override;
- Result<std::unique_ptr<Table>> CreateTable(
+ Result<std::shared_ptr<Table>> CreateTable(
const TableIdentifier& identifier, const Schema& schema, const
PartitionSpec& spec,
const std::string& location,
const std::unordered_map<std::string, std::string>& properties) override;
- Result<std::unique_ptr<Table>> UpdateTable(
+ Result<std::shared_ptr<Table>> UpdateTable(
const TableIdentifier& identifier,
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
const std::vector<std::unique_ptr<TableUpdate>>& updates) override;
@@ -91,7 +91,7 @@ class ICEBERG_EXPORT InMemoryCatalog
Status RenameTable(const TableIdentifier& from, const TableIdentifier& to)
override;
- Result<std::unique_ptr<Table>> LoadTable(const TableIdentifier& identifier)
override;
+ Result<std::shared_ptr<Table>> LoadTable(const TableIdentifier& identifier)
override;
Result<std::shared_ptr<Table>> RegisterTable(
const TableIdentifier& identifier,
diff --git a/src/iceberg/catalog/rest/rest_catalog.cc
b/src/iceberg/catalog/rest/rest_catalog.cc
index b9dfaafc..0d14ea38 100644
--- a/src/iceberg/catalog/rest/rest_catalog.cc
+++ b/src/iceberg/catalog/rest/rest_catalog.cc
@@ -240,7 +240,7 @@ Result<std::vector<TableIdentifier>>
RestCatalog::ListTables(
return NotImplemented("Not implemented");
}
-Result<std::unique_ptr<Table>> RestCatalog::CreateTable(
+Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
[[maybe_unused]] const TableIdentifier& identifier,
[[maybe_unused]] const Schema& schema, [[maybe_unused]] const
PartitionSpec& spec,
[[maybe_unused]] const std::string& location,
@@ -248,7 +248,7 @@ Result<std::unique_ptr<Table>> RestCatalog::CreateTable(
return NotImplemented("Not implemented");
}
-Result<std::unique_ptr<Table>> RestCatalog::UpdateTable(
+Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
[[maybe_unused]] const TableIdentifier& identifier,
[[maybe_unused]] const std::vector<std::unique_ptr<TableRequirement>>&
requirements,
[[maybe_unused]] const std::vector<std::unique_ptr<TableUpdate>>& updates)
{
@@ -278,7 +278,7 @@ Status RestCatalog::RenameTable([[maybe_unused]] const
TableIdentifier& from,
return NotImplemented("Not implemented");
}
-Result<std::unique_ptr<Table>> RestCatalog::LoadTable(
+Result<std::shared_ptr<Table>> RestCatalog::LoadTable(
[[maybe_unused]] const TableIdentifier& identifier) {
return NotImplemented("Not implemented");
}
diff --git a/src/iceberg/catalog/rest/rest_catalog.h
b/src/iceberg/catalog/rest/rest_catalog.h
index c8ddca58..26616827 100644
--- a/src/iceberg/catalog/rest/rest_catalog.h
+++ b/src/iceberg/catalog/rest/rest_catalog.h
@@ -71,12 +71,12 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog {
Result<std::vector<TableIdentifier>> ListTables(const Namespace& ns) const
override;
- Result<std::unique_ptr<Table>> CreateTable(
+ Result<std::shared_ptr<Table>> CreateTable(
const TableIdentifier& identifier, const Schema& schema, const
PartitionSpec& spec,
const std::string& location,
const std::unordered_map<std::string, std::string>& properties) override;
- Result<std::unique_ptr<Table>> UpdateTable(
+ Result<std::shared_ptr<Table>> UpdateTable(
const TableIdentifier& identifier,
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
const std::vector<std::unique_ptr<TableUpdate>>& updates) override;
@@ -92,7 +92,7 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog {
Status DropTable(const TableIdentifier& identifier, bool purge) override;
- Result<std::unique_ptr<Table>> LoadTable(const TableIdentifier& identifier)
override;
+ Result<std::shared_ptr<Table>> LoadTable(const TableIdentifier& identifier)
override;
Result<std::shared_ptr<Table>> RegisterTable(
const TableIdentifier& identifier,
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index d473d72e..21a41dcf 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -94,9 +94,11 @@ iceberg_sources = files(
'table_requirements.cc',
'table_scan.cc',
'table_update.cc',
+ 'transaction.cc',
'transform.cc',
'transform_function.cc',
'type.cc',
+ 'update/pending_update.cc',
'update/update_properties.cc',
'util/bucket_util.cc',
'util/conversions.cc',
@@ -175,7 +177,6 @@ install_headers(
'name_mapping.h',
'partition_field.h',
'partition_spec.h',
- 'pending_update.h',
'result.h',
'schema_field.h',
'schema.h',
@@ -196,7 +197,6 @@ install_headers(
'transform.h',
'type_fwd.h',
'type.h',
- 'update/update_properties.h',
],
subdir: 'iceberg',
)
@@ -205,6 +205,7 @@ subdir('catalog')
subdir('expression')
subdir('manifest')
subdir('row')
+subdir('update')
subdir('util')
if get_option('tests').enabled()
diff --git a/src/iceberg/pending_update.h b/src/iceberg/pending_update.h
deleted file mode 100644
index 8370db14..00000000
--- a/src/iceberg/pending_update.h
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#pragma once
-
-/// \file iceberg/pending_update.h
-/// API for table changes using builder pattern
-
-#include "iceberg/iceberg_export.h"
-#include "iceberg/result.h"
-#include "iceberg/type_fwd.h"
-#include "iceberg/util/error_collector.h"
-
-namespace iceberg {
-
-/// \brief Base class for table metadata changes using builder pattern
-///
-/// This base class allows storing different types of PendingUpdate operations
-/// in the same collection (e.g., in Transaction). It provides the common
Commit()
-/// interface that all updates share.
-///
-/// This matches the Java Iceberg pattern where BaseTransaction stores a
-/// List<PendingUpdate> without type parameters.
-class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
- public:
- virtual ~PendingUpdate() = default;
-
- /// \brief Verify that the changes are valid and apply them.
- /// \return Status::OK if the changes are valid, or an error:
- /// - ValidationFailed: if pending changes cannot be applied
- /// - InvalidArgument: if pending changes are conflicting
- virtual Status Apply() = 0;
-
- /// \brief Apply and commit the pending changes to the table
- ///
- /// Changes are committed by calling the underlying table's commit operation.
- ///
- /// Once the commit is successful, the updated table will be refreshed.
- ///
- /// \return Status::OK if the commit was successful, or an error:
- /// - ValidationFailed: if update cannot be applied to current
metadata
- /// - CommitFailed: if update cannot be committed due to conflicts
- /// - CommitStateUnknown: if commit success state is unknown
- virtual Status Commit() = 0;
-
- // Non-copyable, movable
- PendingUpdate(const PendingUpdate&) = delete;
- PendingUpdate& operator=(const PendingUpdate&) = delete;
- PendingUpdate(PendingUpdate&&) noexcept = default;
- PendingUpdate& operator=(PendingUpdate&&) noexcept = default;
-
- protected:
- PendingUpdate() = default;
-};
-
-} // namespace iceberg
diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc
index 09ff7bda..b5a1e582 100644
--- a/src/iceberg/table.cc
+++ b/src/iceberg/table.cc
@@ -21,16 +21,40 @@
#include "iceberg/catalog.h"
#include "iceberg/partition_spec.h"
+#include "iceberg/result.h"
#include "iceberg/schema.h"
#include "iceberg/sort_order.h"
#include "iceberg/table_metadata.h"
#include "iceberg/table_properties.h"
#include "iceberg/table_scan.h"
+#include "iceberg/transaction.h"
#include "iceberg/update/update_properties.h"
#include "iceberg/util/macros.h"
namespace iceberg {
+Result<std::shared_ptr<Table>> Table::Make(TableIdentifier identifier,
+ std::shared_ptr<TableMetadata>
metadata,
+ std::string metadata_location,
+ std::shared_ptr<FileIO> io,
+ std::shared_ptr<Catalog> catalog) {
+ if (metadata == nullptr) [[unlikely]] {
+ return InvalidArgument("Metadata cannot be null");
+ }
+ if (metadata_location.empty()) [[unlikely]] {
+ return InvalidArgument("Metadata location cannot be empty");
+ }
+ if (io == nullptr) [[unlikely]] {
+ return InvalidArgument("FileIO cannot be null");
+ }
+ if (catalog == nullptr) [[unlikely]] {
+ return InvalidArgument("Catalog cannot be null");
+ }
+ return std::shared_ptr<Table>(new Table(std::move(identifier),
std::move(metadata),
+ std::move(metadata_location),
std::move(io),
+ std::move(catalog)));
+}
+
Table::~Table() = default;
Table::Table(TableIdentifier identifier, std::shared_ptr<TableMetadata>
metadata,
@@ -46,10 +70,6 @@ Table::Table(TableIdentifier identifier,
std::shared_ptr<TableMetadata> metadata
const std::string& Table::uuid() const { return metadata_->table_uuid; }
Status Table::Refresh() {
- if (!catalog_) {
- return NotSupported("Refresh is not supported for table without a
catalog");
- }
-
ICEBERG_ASSIGN_OR_RAISE(auto refreshed_table,
catalog_->LoadTable(identifier_));
if (metadata_location_ != refreshed_table->metadata_file_location()) {
metadata_ = std::move(refreshed_table->metadata_);
@@ -110,18 +130,78 @@ const std::vector<SnapshotLogEntry>& Table::history()
const {
return metadata_->snapshot_log;
}
-std::unique_ptr<UpdateProperties> Table::UpdateProperties() const {
- return std::make_unique<iceberg::UpdateProperties>(identifier_, catalog_,
metadata_);
+const std::shared_ptr<FileIO>& Table::io() const { return io_; }
+
+const std::shared_ptr<TableMetadata>& Table::metadata() const { return
metadata_; }
+
+const std::shared_ptr<Catalog>& Table::catalog() const { return catalog_; }
+
+Result<std::unique_ptr<TableScanBuilder>> Table::NewScan() const {
+ return std::make_unique<TableScanBuilder>(metadata_, io_);
}
-std::unique_ptr<Transaction> Table::NewTransaction() const {
- throw NotImplemented("Table::NewTransaction is not implemented");
+Result<std::shared_ptr<Transaction>> Table::NewTransaction() {
+ // Create a brand new transaction object for the table. Users are expected
to commit the
+ // transaction manually.
+ return Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
+ /*auto_commit=*/false);
}
-const std::shared_ptr<FileIO>& Table::io() const { return io_; }
+Result<std::shared_ptr<UpdateProperties>> Table::NewUpdateProperties() {
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto transaction, Transaction::Make(shared_from_this(),
Transaction::Kind::kUpdate,
+ /*auto_commit=*/true));
+ return transaction->NewUpdateProperties();
+}
-std::unique_ptr<TableScanBuilder> Table::NewScan() const {
- return std::make_unique<TableScanBuilder>(metadata_, io_);
+Result<std::shared_ptr<StagedTable>> StagedTable::Make(
+ TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
+ std::string metadata_location, std::shared_ptr<FileIO> io,
+ std::shared_ptr<Catalog> catalog) {
+ if (metadata == nullptr) [[unlikely]] {
+ return InvalidArgument("Metadata cannot be null");
+ }
+ if (io == nullptr) [[unlikely]] {
+ return InvalidArgument("FileIO cannot be null");
+ }
+ if (catalog == nullptr) [[unlikely]] {
+ return InvalidArgument("Catalog cannot be null");
+ }
+ return std::shared_ptr<StagedTable>(
+ new StagedTable(std::move(identifier), std::move(metadata),
+ std::move(metadata_location), std::move(io),
std::move(catalog)));
+}
+
+StagedTable::~StagedTable() = default;
+
+Result<std::unique_ptr<TableScanBuilder>> StagedTable::NewScan() const {
+ return NotSupported("Cannot scan a staged table");
+}
+
+Result<std::shared_ptr<StaticTable>> StaticTable::Make(
+ TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
+ std::string metadata_location, std::shared_ptr<FileIO> io) {
+ if (metadata == nullptr) [[unlikely]] {
+ return InvalidArgument("Metadata cannot be null");
+ }
+ if (io == nullptr) [[unlikely]] {
+ return InvalidArgument("FileIO cannot be null");
+ }
+ return std::shared_ptr<StaticTable>(
+ new StaticTable(std::move(identifier), std::move(metadata),
+ std::move(metadata_location), std::move(io),
/*catalog=*/nullptr));
+}
+
+StaticTable::~StaticTable() = default;
+
+Status StaticTable::Refresh() { return NotSupported("Cannot refresh a static
table"); }
+
+Result<std::shared_ptr<Transaction>> StaticTable::NewTransaction() {
+ return NotSupported("Cannot create a transaction for a static table");
+}
+
+Result<std::shared_ptr<UpdateProperties>> StaticTable::NewUpdateProperties() {
+ return NotSupported("Cannot create an update properties for a static table");
}
} // namespace iceberg
diff --git a/src/iceberg/table.h b/src/iceberg/table.h
index 98e6b998..efe17582 100644
--- a/src/iceberg/table.h
+++ b/src/iceberg/table.h
@@ -34,20 +34,21 @@
namespace iceberg {
/// \brief Represents an Iceberg table
-class ICEBERG_EXPORT Table {
+class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
public:
- ~Table();
-
/// \brief Construct a table.
/// \param[in] identifier The identifier of the table.
/// \param[in] metadata The metadata for the table.
/// \param[in] metadata_location The location of the table metadata file.
/// \param[in] io The FileIO to read and write table data and metadata files.
- /// \param[in] catalog The catalog that this table belongs to. If null, the
table will
- /// be read-only.
- Table(TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
- std::string metadata_location, std::shared_ptr<FileIO> io,
- std::shared_ptr<Catalog> catalog);
+ /// \param[in] catalog The catalog that this table belongs to.
+ static Result<std::shared_ptr<Table>> Make(TableIdentifier identifier,
+ std::shared_ptr<TableMetadata>
metadata,
+ std::string metadata_location,
+ std::shared_ptr<FileIO> io,
+ std::shared_ptr<Catalog> catalog);
+
+ virtual ~Table();
/// \brief Return the identifier of this table
const TableIdentifier& name() const { return identifier_; }
@@ -55,9 +56,6 @@ class ICEBERG_EXPORT Table {
/// \brief Returns the UUID of the table
const std::string& uuid() const;
- /// \brief Refresh the current table metadata
- Status Refresh();
-
/// \brief Return the schema for this table, return NotFoundError if not
found
Result<std::shared_ptr<Schema>> schema() const;
@@ -107,34 +105,38 @@ class ICEBERG_EXPORT Table {
const std::vector<std::shared_ptr<Snapshot>>& snapshots() const;
/// \brief Get the snapshot history of this table
- ///
- /// \return a vector of history entries
const std::vector<SnapshotLogEntry>& history() const;
- /// \brief Create a new UpdateProperties to update table properties and
commit the
- /// changes
- ///
- /// \return a new UpdateProperties instance
- virtual std::unique_ptr<iceberg::UpdateProperties> UpdateProperties() const;
+ /// \brief Returns a FileIO to read and write table data and metadata files
+ const std::shared_ptr<FileIO>& io() const;
+
+ /// \brief Returns the current metadata for this table
+ const std::shared_ptr<TableMetadata>& metadata() const;
+
+ /// \brief Returns the catalog that this table belongs to
+ const std::shared_ptr<Catalog>& catalog() const;
+
+ /// \brief Refresh the current table metadata
+ virtual Status Refresh();
/// \brief Create a new table scan builder for this table
///
/// Once a table scan builder is created, it can be refined to project
columns and
/// filter data.
- virtual std::unique_ptr<TableScanBuilder> NewScan() const;
+ virtual Result<std::unique_ptr<TableScanBuilder>> NewScan() const;
- /// \brief Create a new transaction for this table
- ///
- /// \return a pointer to the new Transaction
- virtual std::unique_ptr<Transaction> NewTransaction() const;
+ /// \brief Create a new Transaction to commit multiple table operations at
once.
+ virtual Result<std::shared_ptr<Transaction>> NewTransaction();
- /// \brief Returns a FileIO to read and write table data and metadata files
- const std::shared_ptr<FileIO>& io() const;
+ /// \brief Create a new UpdateProperties to update table properties and
commit the
+ /// changes.
+ virtual Result<std::shared_ptr<UpdateProperties>> NewUpdateProperties();
- /// \brief Returns the current metadata for this table
- const std::shared_ptr<TableMetadata>& metadata() const;
+ protected:
+ Table(TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
+ std::string metadata_location, std::shared_ptr<FileIO> io,
+ std::shared_ptr<Catalog> catalog);
- private:
const TableIdentifier identifier_;
std::shared_ptr<TableMetadata> metadata_;
std::string metadata_location_;
@@ -143,4 +145,42 @@ class ICEBERG_EXPORT Table {
std::unique_ptr<class TableMetadataCache> metadata_cache_;
};
+/// \brief A table created by stage-create and not yet committed.
+class ICEBERG_EXPORT StagedTable final : public Table {
+ public:
+ static Result<std::shared_ptr<StagedTable>> Make(
+ TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
+ std::string metadata_location, std::shared_ptr<FileIO> io,
+ std::shared_ptr<Catalog> catalog);
+
+ ~StagedTable() override;
+
+ Status Refresh() override { return {}; }
+
+ Result<std::unique_ptr<TableScanBuilder>> NewScan() const override;
+
+ private:
+ using Table::Table;
+};
+
+/// \brief A read-only table.
+
+class ICEBERG_EXPORT StaticTable final : public Table {
+ public:
+ static Result<std::shared_ptr<StaticTable>> Make(
+ TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
+ std::string metadata_location, std::shared_ptr<FileIO> io);
+
+ ~StaticTable() override;
+
+ Status Refresh() override;
+
+ Result<std::shared_ptr<Transaction>> NewTransaction() override;
+
+ Result<std::shared_ptr<UpdateProperties>> NewUpdateProperties() override;
+
+ private:
+ using Table::Table;
+};
+
} // namespace iceberg
diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc
index 4e814e02..61bb8e08 100644
--- a/src/iceberg/table_metadata.cc
+++ b/src/iceberg/table_metadata.cc
@@ -790,4 +790,12 @@ Result<std::unique_ptr<TableMetadata>>
TableMetadataBuilder::Build() {
return std::make_unique<TableMetadata>(std::move(impl_->metadata));
}
+const std::vector<std::unique_ptr<TableUpdate>>&
TableMetadataBuilder::changes() const {
+ return impl_->changes;
+}
+
+const TableMetadata* TableMetadataBuilder::base() const { return impl_->base; }
+
+const TableMetadata& TableMetadataBuilder::current() const { return
impl_->metadata; }
+
} // namespace iceberg
diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h
index f428fd34..f7c26011 100644
--- a/src/iceberg/table_metadata.h
+++ b/src/iceberg/table_metadata.h
@@ -419,6 +419,15 @@ class ICEBERG_EXPORT TableMetadataBuilder : public
ErrorCollector {
/// \return A Result containing the constructed TableMetadata or an error
Result<std::unique_ptr<TableMetadata>> Build();
+ /// \brief Returns the changes made to the table metadata
+ const std::vector<std::unique_ptr<TableUpdate>>& changes() const;
+
+ /// \brief Returns the base metadata without any changes
+ const TableMetadata* base() const;
+
+ /// \brief Returns the current metadata with staged changes applied
+ const TableMetadata& current() const;
+
/// \brief Destructor
~TableMetadataBuilder() override;
diff --git a/src/iceberg/table_update.cc b/src/iceberg/table_update.cc
index 90f7de62..87fcd134 100644
--- a/src/iceberg/table_update.cc
+++ b/src/iceberg/table_update.cc
@@ -23,6 +23,10 @@
#include "iceberg/table_metadata.h"
#include "iceberg/table_requirements.h"
+namespace iceberg {
+TableUpdate::~TableUpdate() = default;
+}
+
namespace iceberg::table {
// AssignUUID
@@ -38,7 +42,7 @@ void AssignUUID::GenerateRequirements(TableUpdateContext&
context) const {
// UpgradeFormatVersion
void UpgradeFormatVersion::ApplyTo(TableMetadataBuilder& builder) const {
- throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+ builder.UpgradeFormatVersion(format_version_);
}
void UpgradeFormatVersion::GenerateRequirements(TableUpdateContext& context)
const {
diff --git a/src/iceberg/table_update.h b/src/iceberg/table_update.h
index 93b48cf2..a040cb36 100644
--- a/src/iceberg/table_update.h
+++ b/src/iceberg/table_update.h
@@ -40,7 +40,30 @@ namespace iceberg {
/// represents a specific type of update operation.
class ICEBERG_EXPORT TableUpdate {
public:
- virtual ~TableUpdate() = default;
+ enum class Kind : uint8_t {
+ kAssignUUID,
+ kUpgradeFormatVersion,
+ kAddSchema,
+ kSetCurrentSchema,
+ kAddPartitionSpec,
+ kSetDefaultPartitionSpec,
+ kRemovePartitionSpecs,
+ kRemoveSchemas,
+ kAddSortOrder,
+ kSetDefaultSortOrder,
+ kAddSnapshot,
+ kRemoveSnapshots,
+ kRemoveSnapshotRef,
+ kSetSnapshotRef,
+ kSetProperties,
+ kRemoveProperties,
+ kSetLocation,
+ };
+
+ virtual ~TableUpdate();
+
+ /// \brief Return the kind of this update.
+ virtual Kind kind() const = 0;
/// \brief Apply this update to a TableMetadataBuilder
///
@@ -74,6 +97,8 @@ class ICEBERG_EXPORT AssignUUID : public TableUpdate {
void GenerateRequirements(TableUpdateContext& context) const override;
+ Kind kind() const override { return Kind::kAssignUUID; }
+
private:
std::string uuid_;
};
@@ -90,6 +115,8 @@ class ICEBERG_EXPORT UpgradeFormatVersion : public
TableUpdate {
void GenerateRequirements(TableUpdateContext& context) const override;
+ Kind kind() const override { return Kind::kUpgradeFormatVersion; }
+
private:
int8_t format_version_;
};
@@ -108,6 +135,8 @@ class ICEBERG_EXPORT AddSchema : public TableUpdate {
void GenerateRequirements(TableUpdateContext& context) const override;
+ Kind kind() const override { return Kind::kAddSchema; }
+
private:
std::shared_ptr<Schema> schema_;
int32_t last_column_id_;
@@ -124,6 +153,8 @@ class ICEBERG_EXPORT SetCurrentSchema : public TableUpdate {
void GenerateRequirements(TableUpdateContext& context) const override;
+ Kind kind() const override { return Kind::kSetCurrentSchema; }
+
private:
int32_t schema_id_;
};
@@ -140,6 +171,8 @@ class ICEBERG_EXPORT AddPartitionSpec : public TableUpdate {
void GenerateRequirements(TableUpdateContext& context) const override;
+ Kind kind() const override { return Kind::kAddPartitionSpec; }
+
private:
std::shared_ptr<PartitionSpec> spec_;
};
@@ -155,6 +188,8 @@ class ICEBERG_EXPORT SetDefaultPartitionSpec : public
TableUpdate {
void GenerateRequirements(TableUpdateContext& context) const override;
+ Kind kind() const override { return Kind::kSetDefaultPartitionSpec; }
+
private:
int32_t spec_id_;
};
@@ -171,6 +206,8 @@ class ICEBERG_EXPORT RemovePartitionSpecs : public
TableUpdate {
void GenerateRequirements(TableUpdateContext& context) const override;
+ Kind kind() const override { return Kind::kRemovePartitionSpecs; }
+
private:
std::vector<int32_t> spec_ids_;
};
@@ -187,6 +224,8 @@ class ICEBERG_EXPORT RemoveSchemas : public TableUpdate {
void GenerateRequirements(TableUpdateContext& context) const override;
+ Kind kind() const override { return Kind::kRemoveSchemas; }
+
private:
std::vector<int32_t> schema_ids_;
};
@@ -203,6 +242,8 @@ class ICEBERG_EXPORT AddSortOrder : public TableUpdate {
void GenerateRequirements(TableUpdateContext& context) const override;
+ Kind kind() const override { return Kind::kAddSortOrder; }
+
private:
std::shared_ptr<SortOrder> sort_order_;
};
@@ -218,6 +259,8 @@ class ICEBERG_EXPORT SetDefaultSortOrder : public
TableUpdate {
void GenerateRequirements(TableUpdateContext& context) const override;
+ Kind kind() const override { return Kind::kSetDefaultSortOrder; }
+
private:
int32_t sort_order_id_;
};
@@ -234,6 +277,8 @@ class ICEBERG_EXPORT AddSnapshot : public TableUpdate {
void GenerateRequirements(TableUpdateContext& context) const override;
+ Kind kind() const override { return Kind::kAddSnapshot; }
+
private:
std::shared_ptr<Snapshot> snapshot_;
};
@@ -250,6 +295,8 @@ class ICEBERG_EXPORT RemoveSnapshots : public TableUpdate {
void GenerateRequirements(TableUpdateContext& context) const override;
+ Kind kind() const override { return Kind::kRemoveSnapshots; }
+
private:
std::vector<int64_t> snapshot_ids_;
};
@@ -265,6 +312,8 @@ class ICEBERG_EXPORT RemoveSnapshotRef : public TableUpdate
{
void GenerateRequirements(TableUpdateContext& context) const override;
+ Kind kind() const override { return Kind::kRemoveSnapshotRef; }
+
private:
std::string ref_name_;
};
@@ -298,6 +347,8 @@ class ICEBERG_EXPORT SetSnapshotRef : public TableUpdate {
void GenerateRequirements(TableUpdateContext& context) const override;
+ Kind kind() const override { return Kind::kSetSnapshotRef; }
+
private:
std::string ref_name_;
int64_t snapshot_id_;
@@ -319,6 +370,8 @@ class ICEBERG_EXPORT SetProperties : public TableUpdate {
void GenerateRequirements(TableUpdateContext& context) const override;
+ Kind kind() const override { return Kind::kSetProperties; }
+
private:
std::unordered_map<std::string, std::string> updated_;
};
@@ -335,6 +388,8 @@ class ICEBERG_EXPORT RemoveProperties : public TableUpdate {
void GenerateRequirements(TableUpdateContext& context) const override;
+ Kind kind() const override { return Kind::kRemoveProperties; }
+
private:
std::vector<std::string> removed_;
};
@@ -350,6 +405,8 @@ class ICEBERG_EXPORT SetLocation : public TableUpdate {
void GenerateRequirements(TableUpdateContext& context) const override;
+ Kind kind() const override { return Kind::kSetLocation; }
+
private:
std::string location_;
};
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index 2c4e0f51..2af7d1c4 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -55,39 +55,36 @@ endfunction()
add_iceberg_test(schema_test
SOURCES
name_mapping_test.cc
- schema_test.cc
- schema_field_test.cc
- type_test.cc
- transform_test.cc
partition_field_test.cc
partition_spec_test.cc
partition_value_test.cc
+ schema_field_test.cc
+ schema_test.cc
+ schema_util_test.cc
sort_field_test.cc
sort_order_test.cc
- snapshot_test.cc
- schema_util_test.cc)
+ transform_test.cc
+ type_test.cc)
add_iceberg_test(table_test
SOURCES
- json_internal_test.cc
metrics_config_test.cc
- schema_json_test.cc
- table_test.cc
+ snapshot_test.cc
table_metadata_builder_test.cc
table_requirement_test.cc
table_requirements_test.cc
- table_update_test.cc
- update_properties_test.cc)
+ table_test.cc
+ table_update_test.cc)
add_iceberg_test(expression_test
SOURCES
aggregate_test.cc
expression_test.cc
expression_visitor_test.cc
- literal_test.cc
- manifest_evaluator_test.cc
inclusive_metrics_evaluator_test.cc
inclusive_metrics_evaluator_with_transform_test.cc
+ literal_test.cc
+ manifest_evaluator_test.cc
predicate_test.cc
projections_test.cc
residual_evaluator_test.cc
@@ -151,6 +148,12 @@ if(ICEBERG_BUILD_BUNDLE)
add_iceberg_test(scan_test USE_BUNDLE SOURCES file_scan_task_test.cc)
+ add_iceberg_test(table_update_test
+ USE_BUNDLE
+ SOURCES
+ transaction_test.cc
+ update_properties_test.cc)
+
endif()
if(ICEBERG_BUILD_REST)
diff --git a/src/iceberg/test/in_memory_catalog_test.cc
b/src/iceberg/test/in_memory_catalog_test.cc
index d1a8ccef..194d6da5 100644
--- a/src/iceberg/test/in_memory_catalog_test.cc
+++ b/src/iceberg/test/in_memory_catalog_test.cc
@@ -36,6 +36,7 @@
#include "iceberg/table_update.h"
#include "iceberg/test/matchers.h"
#include "iceberg/test/mock_catalog.h"
+#include "iceberg/test/mock_io.h"
#include "iceberg/test/test_resource.h"
#include "iceberg/util/uuid.h"
@@ -128,22 +129,21 @@ TEST_F(InMemoryCatalogTest, RefreshTable) {
std::vector<SchemaField>{SchemaField::MakeRequired(1, "x", int64())},
/*schema_id=*/1);
- std::shared_ptr<FileIO> io;
-
+ auto io = std::make_shared<MockFileIO>();
auto catalog = std::make_shared<MockCatalog>();
// Mock 1st call to LoadTable
EXPECT_CALL(*catalog, LoadTable(::testing::_))
.WillOnce(::testing::Return(
- std::make_unique<Table>(table_ident,
-
std::make_shared<TableMetadata>(TableMetadata{
- .schemas = {schema},
- .current_schema_id = 1,
- .current_snapshot_id = 1,
- .snapshots =
{std::make_shared<Snapshot>(Snapshot{
- .snapshot_id = 1,
- .sequence_number = 1,
- })}}),
- "s3://location/1.json", io, catalog)));
+ Table::Make(table_ident,
+ std::make_shared<TableMetadata>(
+ TableMetadata{.schemas = {schema},
+ .current_schema_id = 1,
+ .current_snapshot_id = 1,
+ .snapshots =
{std::make_shared<Snapshot>(Snapshot{
+ .snapshot_id = 1,
+ .sequence_number = 1,
+ })}}),
+ "s3://location/1.json", io, catalog)));
auto load_table_result = catalog->LoadTable(table_ident);
ASSERT_THAT(load_table_result, IsOk());
auto loaded_table = std::move(load_table_result.value());
@@ -152,20 +152,20 @@ TEST_F(InMemoryCatalogTest, RefreshTable) {
// Mock 2nd call to LoadTable
EXPECT_CALL(*catalog, LoadTable(::testing::_))
.WillOnce(::testing::Return(
- std::make_unique<Table>(table_ident,
-
std::make_shared<TableMetadata>(TableMetadata{
- .schemas = {schema},
- .current_schema_id = 1,
- .current_snapshot_id = 2,
- .snapshots =
{std::make_shared<Snapshot>(Snapshot{
- .snapshot_id = 1,
- .sequence_number = 1,
- }),
-
std::make_shared<Snapshot>(Snapshot{
- .snapshot_id = 2,
- .sequence_number = 2,
- })}}),
- "s3://location/2.json", io, catalog)));
+ Table::Make(table_ident,
+ std::make_shared<TableMetadata>(
+ TableMetadata{.schemas = {schema},
+ .current_schema_id = 1,
+ .current_snapshot_id = 2,
+ .snapshots =
{std::make_shared<Snapshot>(Snapshot{
+ .snapshot_id = 1,
+ .sequence_number = 1,
+ }),
+
std::make_shared<Snapshot>(Snapshot{
+ .snapshot_id = 2,
+ .sequence_number = 2,
+ })}}),
+ "s3://location/2.json", io, catalog)));
auto refreshed_result = loaded_table->Refresh();
ASSERT_THAT(refreshed_result, IsOk());
// check table is refreshed
diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build
index f12b43af..5ccab940 100644
--- a/src/iceberg/test/meson.build
+++ b/src/iceberg/test/meson.build
@@ -37,7 +37,6 @@ iceberg_tests = {
'schema_field_test.cc',
'schema_test.cc',
'schema_util_test.cc',
- 'snapshot_test.cc',
'sort_field_test.cc',
'sort_order_test.cc',
'transform_test.cc',
@@ -46,14 +45,13 @@ iceberg_tests = {
},
'table_test': {
'sources': files(
- 'json_internal_test.cc',
'metrics_config_test.cc',
- 'schema_json_test.cc',
+ 'snapshot_test.cc',
'table_metadata_builder_test.cc',
'table_requirement_test.cc',
+ 'table_requirements_test.cc',
'table_test.cc',
'table_update_test.cc',
- 'update_properties_test.cc',
),
},
'expression_test': {
diff --git a/src/iceberg/test/mock_catalog.h b/src/iceberg/test/mock_catalog.h
index 46f01c8d..1f43cfab 100644
--- a/src/iceberg/test/mock_catalog.h
+++ b/src/iceberg/test/mock_catalog.h
@@ -55,12 +55,12 @@ class MockCatalog : public Catalog {
MOCK_METHOD((Result<std::vector<TableIdentifier>>), ListTables, (const
Namespace&),
(const, override));
- MOCK_METHOD((Result<std::unique_ptr<Table>>), CreateTable,
+ MOCK_METHOD((Result<std::shared_ptr<Table>>), CreateTable,
(const TableIdentifier&, const Schema&, const PartitionSpec&,
const std::string&, (const std::unordered_map<std::string,
std::string>&)),
(override));
- MOCK_METHOD((Result<std::unique_ptr<Table>>), UpdateTable,
+ MOCK_METHOD((Result<std::shared_ptr<Table>>), UpdateTable,
(const TableIdentifier&,
(const std::vector<std::unique_ptr<TableRequirement>>&),
(const std::vector<std::unique_ptr<TableUpdate>>&)),
@@ -78,7 +78,7 @@ class MockCatalog : public Catalog {
MOCK_METHOD(Status, RenameTable, (const TableIdentifier&, const
TableIdentifier&),
(override));
- MOCK_METHOD((Result<std::unique_ptr<Table>>), LoadTable, (const
TableIdentifier&),
+ MOCK_METHOD((Result<std::shared_ptr<Table>>), LoadTable, (const
TableIdentifier&),
(override));
MOCK_METHOD((Result<std::shared_ptr<Table>>), RegisterTable,
diff --git a/src/iceberg/test/mock_io.h b/src/iceberg/test/mock_io.h
new file mode 100644
index 00000000..c9f38e50
--- /dev/null
+++ b/src/iceberg/test/mock_io.h
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/file_io.h"
+
+namespace iceberg {
+
+class MockFileIO : public FileIO {
+ public:
+ MockFileIO() = default;
+ ~MockFileIO() override = default;
+
+ MOCK_METHOD((Result<std::string>), ReadFile,
+ (const std::string&, std::optional<size_t>), (override));
+
+ MOCK_METHOD(Status, WriteFile, (const std::string&, std::string_view),
(override));
+
+ MOCK_METHOD(Status, DeleteFile, (const std::string&), (override));
+};
+
+} // namespace iceberg
diff --git a/src/iceberg/test/table_test.cc b/src/iceberg/test/table_test.cc
index 59710e0f..e445d901 100644
--- a/src/iceberg/test/table_test.cc
+++ b/src/iceberg/test/table_test.cc
@@ -19,99 +19,129 @@
#include "iceberg/table.h"
+#include <gmock/gmock.h>
#include <gtest/gtest.h>
-#include <nlohmann/json.hpp>
-#include "iceberg/partition_spec.h"
#include "iceberg/schema.h"
-#include "iceberg/snapshot.h"
+#include "iceberg/schema_field.h"
#include "iceberg/table_metadata.h"
#include "iceberg/test/matchers.h"
-#include "iceberg/test/test_resource.h"
+#include "iceberg/test/mock_catalog.h"
+#include "iceberg/test/mock_io.h"
namespace iceberg {
-TEST(Table, TableV1) {
- ICEBERG_UNWRAP_OR_FAIL(auto metadata,
-
ReadTableMetadataFromResource("TableMetadataV1Valid.json"));
- TableIdentifier tableIdent{.ns = {}, .name = "test_table_v1"};
- Table table(tableIdent, std::move(metadata),
"s3://bucket/test/location/meta/", nullptr,
- nullptr);
- ASSERT_EQ(table.name().name, "test_table_v1");
-
- // Check table schema
- auto schema = table.schema();
- ASSERT_TRUE(schema.has_value());
- ASSERT_EQ(schema.value()->fields().size(), 3);
- auto schemas = table.schemas();
- ASSERT_TRUE(schemas->get().empty());
-
- // Check table spec
- auto spec = table.spec();
- ASSERT_TRUE(spec.has_value());
- auto specs = table.specs();
- ASSERT_EQ(1UL, specs->get().size());
-
- // Check table sort_order
- auto sort_order = table.sort_order();
- ASSERT_TRUE(sort_order.has_value());
- auto sort_orders = table.sort_orders();
- ASSERT_EQ(1UL, sort_orders->get().size());
-
- // Check table location
- auto location = table.location();
- ASSERT_EQ(location, "s3://bucket/test/location");
-
- // Check table snapshots
- auto snapshots = table.snapshots();
- ASSERT_TRUE(snapshots.empty());
-
- auto io = table.io();
- ASSERT_TRUE(io == nullptr);
+template <typename T>
+struct TableTraits;
+
+template <>
+struct TableTraits<Table> {
+ static constexpr bool kRefreshSupported = true;
+ static constexpr bool kTransactionSupported = true;
+
+ static Result<std::shared_ptr<Table>> Make(const TableIdentifier& ident,
+ std::shared_ptr<TableMetadata>
metadata,
+ const std::string& location,
+ std::shared_ptr<FileIO> io,
+ std::shared_ptr<Catalog> catalog)
{
+ return Table::Make(ident, std::move(metadata), location, std::move(io),
+ std::move(catalog));
+ }
+};
+
+template <>
+struct TableTraits<StaticTable> {
+ static constexpr bool kRefreshSupported = false;
+ static constexpr bool kTransactionSupported = false;
+
+ static Result<std::shared_ptr<StaticTable>> Make(
+ const TableIdentifier& ident, std::shared_ptr<TableMetadata> metadata,
+ const std::string& location, std::shared_ptr<FileIO> io,
std::shared_ptr<Catalog>) {
+ return StaticTable::Make(ident, std::move(metadata), location,
std::move(io));
+ }
+};
+
+template <>
+struct TableTraits<StagedTable> {
+ static constexpr bool kRefreshSupported = true;
+ static constexpr bool kTransactionSupported = true;
+
+ static Result<std::shared_ptr<StagedTable>> Make(
+ const TableIdentifier& ident, std::shared_ptr<TableMetadata> metadata,
+ const std::string& location, std::shared_ptr<FileIO> io,
+ std::shared_ptr<Catalog> catalog) {
+ return StagedTable::Make(ident, std::move(metadata), location,
std::move(io),
+ std::move(catalog));
+ }
+};
+
+template <typename T>
+class TypedTableTest : public ::testing::Test {
+ protected:
+ using Traits = TableTraits<T>;
+
+ void SetUp() override {
+ io_ = std::make_shared<MockFileIO>();
+ catalog_ = std::make_shared<MockCatalog>();
+
+ auto schema = std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int64()),
+ SchemaField::MakeOptional(2, "name",
string())},
+ 1);
+ metadata_ = std::make_shared<TableMetadata>(
+ TableMetadata{.format_version = 2, .schemas = {schema},
.current_schema_id = 1});
+ }
+
+ Result<std::shared_ptr<T>> MakeTable(const std::string& name) {
+ TableIdentifier ident{.ns = Namespace{.levels = {"db"}}, .name = name};
+ return Traits::Make(ident, metadata_, "s3://bucket/meta.json", io_,
catalog_);
+ }
+
+ std::shared_ptr<MockFileIO> io_;
+ std::shared_ptr<MockCatalog> catalog_;
+ std::shared_ptr<TableMetadata> metadata_;
+};
+
+using TableTypes = ::testing::Types<Table, StaticTable, StagedTable>;
+TYPED_TEST_SUITE(TypedTableTest, TableTypes);
+
+TYPED_TEST(TypedTableTest, BasicMetadata) {
+ ICEBERG_UNWRAP_OR_FAIL(auto table, this->MakeTable("test_table"));
+
+ EXPECT_EQ(table->name().name, "test_table");
+ EXPECT_EQ(table->name().ns.levels, (std::vector<std::string>{"db"}));
+ EXPECT_EQ(table->metadata()->format_version, 2);
+ EXPECT_EQ(table->metadata()->schemas.size(), 1);
}
-TEST(Table, TableV2) {
- ICEBERG_UNWRAP_OR_FAIL(auto metadata,
-
ReadTableMetadataFromResource("TableMetadataV2Valid.json"));
- TableIdentifier tableIdent{.ns = {}, .name = "test_table_v2"};
-
- Table table(tableIdent, std::move(metadata),
"s3://bucket/test/location/meta/", nullptr,
- nullptr);
- ASSERT_EQ(table.name().name, "test_table_v2");
-
- // Check table schema
- auto schema = table.schema();
- ASSERT_TRUE(schema.has_value());
- ASSERT_EQ(schema.value()->fields().size(), 3);
- auto schemas = table.schemas();
- ASSERT_FALSE(schemas->get().empty());
-
- // Check partition spec
- auto spec = table.spec();
- ASSERT_TRUE(spec.has_value());
- auto specs = table.specs();
- ASSERT_EQ(1UL, specs->get().size());
-
- // Check sort order
- auto sort_order = table.sort_order();
- ASSERT_TRUE(sort_order.has_value());
- auto sort_orders = table.sort_orders();
- ASSERT_EQ(1UL, sort_orders->get().size());
-
- // Check table location
- auto location = table.location();
- ASSERT_EQ(location, "s3://bucket/test/location");
-
- // Check snapshot
- auto snapshots = table.snapshots();
- ASSERT_EQ(2UL, snapshots.size());
- auto snapshot = table.current_snapshot();
- ASSERT_TRUE(snapshot.has_value());
- snapshot = table.SnapshotById(snapshot.value()->snapshot_id);
- ASSERT_TRUE(snapshot.has_value());
- auto invalid_snapshot_id = 9999;
- snapshot = table.SnapshotById(invalid_snapshot_id);
- ASSERT_FALSE(snapshot.has_value());
+TYPED_TEST(TypedTableTest, Refresh) {
+ using Traits = typename TestFixture::Traits;
+ ICEBERG_UNWRAP_OR_FAIL(auto table, this->MakeTable("test_table"));
+
+ if constexpr (Traits::kRefreshSupported) {
+ if constexpr (std::is_same_v<TypeParam, Table>) {
+ TableIdentifier ident{.ns = Namespace{.levels = {"db"}}, .name =
"test_table"};
+ ICEBERG_UNWRAP_OR_FAIL(auto refreshed,
+ Table::Make(ident, this->metadata_,
"s3://bucket/meta2.json",
+ this->io_, this->catalog_));
+ EXPECT_CALL(*this->catalog_, LoadTable(::testing::_))
+ .WillOnce(::testing::Return(refreshed));
+ }
+ EXPECT_THAT(table->Refresh(), IsOk());
+ } else {
+ EXPECT_THAT(table->Refresh(), IsError(ErrorKind::kNotSupported));
+ }
+}
+
+TYPED_TEST(TypedTableTest, NewTransaction) {
+ using Traits = typename TestFixture::Traits;
+ ICEBERG_UNWRAP_OR_FAIL(auto table, this->MakeTable("test_table"));
+
+ if constexpr (Traits::kTransactionSupported) {
+ EXPECT_THAT(table->NewTransaction(), IsOk());
+ } else {
+ EXPECT_THAT(table->NewTransaction(), IsError(ErrorKind::kNotSupported));
+ }
}
} // namespace iceberg
diff --git a/src/iceberg/test/transaction_test.cc
b/src/iceberg/test/transaction_test.cc
new file mode 100644
index 00000000..b8a55d83
--- /dev/null
+++ b/src/iceberg/test/transaction_test.cc
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/transaction.h"
+
+#include "iceberg/table.h"
+#include "iceberg/table_update.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/test/update_test_base.h"
+#include "iceberg/update/update_properties.h"
+
+namespace iceberg {
+
+class TransactionTest : public UpdateTestBase {};
+
+TEST_F(TransactionTest, CreateTransaction) {
+ ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
+ EXPECT_NE(txn, nullptr);
+ EXPECT_EQ(txn->table(), table_);
+}
+
+TEST_F(TransactionTest, UpdatePropertiesInTransaction) {
+ ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
+ ICEBERG_UNWRAP_OR_FAIL(auto update, txn->NewUpdateProperties());
+
+ update->Set("key1", "value1");
+ EXPECT_THAT(update->Apply(), IsOk());
+}
+
+TEST_F(TransactionTest, CommitEmptyTransaction) {
+ ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
+ EXPECT_THAT(txn->Commit(), IsOk());
+}
+
+TEST_F(TransactionTest, CommitTransactionWithPropertyUpdate) {
+ ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
+ ICEBERG_UNWRAP_OR_FAIL(auto update, txn->NewUpdateProperties());
+
+ update->Set("txn.property", "txn.value");
+ EXPECT_THAT(update->Commit(), IsOk());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto updated_table, txn->Commit());
+ EXPECT_NE(updated_table, nullptr);
+
+ // Reload table and verify the property was set
+ ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+ const auto& props = reloaded->properties().configs();
+ EXPECT_EQ(props.at("txn.property"), "txn.value");
+}
+
+TEST_F(TransactionTest, MultipleUpdatesInTransaction) {
+ ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
+
+ // First update
+ ICEBERG_UNWRAP_OR_FAIL(auto update1, txn->NewUpdateProperties());
+ update1->Set("key1", "value1");
+ EXPECT_THAT(update1->Commit(), IsOk());
+
+ // Second update
+ ICEBERG_UNWRAP_OR_FAIL(auto update2, txn->NewUpdateProperties());
+ update2->Set("key2", "value2");
+ EXPECT_THAT(update2->Commit(), IsOk());
+
+ // Commit transaction
+ ICEBERG_UNWRAP_OR_FAIL(auto updated_table, txn->Commit());
+
+ // Verify both properties were set
+ ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+ const auto& props = reloaded->properties().configs();
+ EXPECT_EQ(props.at("key1"), "value1");
+ EXPECT_EQ(props.at("key2"), "value2");
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/test/update_properties_test.cc
b/src/iceberg/test/update_properties_test.cc
index 13cfec83..1084f08f 100644
--- a/src/iceberg/test/update_properties_test.cc
+++ b/src/iceberg/test/update_properties_test.cc
@@ -19,188 +19,122 @@
#include "iceberg/update/update_properties.h"
-#include <cstddef>
-#include <memory>
-#include <string>
-
-#include <gmock/gmock.h>
-#include <gtest/gtest.h>
-
-#include "iceberg/file_format.h"
-#include "iceberg/result.h"
-#include "iceberg/schema.h"
-#include "iceberg/schema_field.h"
#include "iceberg/table.h"
-#include "iceberg/table_identifier.h"
-#include "iceberg/table_metadata.h"
+#include "iceberg/table_update.h"
#include "iceberg/test/matchers.h"
-#include "iceberg/test/mock_catalog.h"
+#include "iceberg/test/update_test_base.h"
namespace iceberg {
-class UpdatePropertiesTest : public ::testing::Test {
- protected:
- void SetUp() override {
- // Create a simple schema
- SchemaField f(1, "col1", std::make_shared<LongType>(), false);
- schema_ = std::make_shared<Schema>(std::vector<SchemaField>{f}, 1);
-
- // Create basic table metadata
- metadata_ = std::make_shared<TableMetadata>();
- metadata_->schemas.push_back(schema_);
-
- // Create catalog and table identifier
- catalog_ = std::make_shared<MockCatalog>();
- identifier_ = TableIdentifier(Namespace({"test"}), "table");
- }
-
- std::shared_ptr<Schema> schema_;
- std::shared_ptr<TableMetadata> metadata_;
- std::shared_ptr<MockCatalog> catalog_;
- TableIdentifier identifier_;
-};
+class UpdatePropertiesTest : public UpdateTestBase {};
TEST_F(UpdatePropertiesTest, EmptyUpdates) {
- UpdateProperties update(identifier_, catalog_, metadata_);
-
- auto result = update.Commit();
- EXPECT_THAT(result, IsOk());
+ ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateProperties());
+ EXPECT_THAT(update->Commit(), IsOk());
}
TEST_F(UpdatePropertiesTest, SetProperty) {
- UpdateProperties update(identifier_, catalog_, metadata_);
- update.Set("key1", "value1").Set("key2", "value2");
+ ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateProperties());
+ update->Set("key1", "value1").Set("key2", "value2");
- auto result = update.Apply();
- EXPECT_THAT(result, IsOk());
+ ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+ EXPECT_EQ(result.updates.size(), 1);
+ EXPECT_EQ(result.updates[0]->kind(),
table::SetProperties::Kind::kSetProperties);
}
TEST_F(UpdatePropertiesTest, RemoveProperty) {
- UpdateProperties update(identifier_, catalog_, metadata_);
- update.Remove("key1").Remove("key2");
+ ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateProperties());
+ update->Remove("key1").Remove("key2");
+
+ ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+ EXPECT_EQ(result.updates.size(), 1);
+ EXPECT_EQ(result.updates[0]->kind(),
table::RemoveProperties::Kind::kRemoveProperties);
+}
+
+TEST_F(UpdatePropertiesTest, SetThenRemoveSameKey) {
+ ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateProperties());
+ update->Set("key1", "value1").Remove("key1");
- auto result = update.Apply();
- EXPECT_THAT(result, IsOk());
+ auto result = update->Apply();
+ EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+ EXPECT_THAT(result, HasErrorMessage("already marked for update"));
}
-TEST_F(UpdatePropertiesTest, SetRemoveConflict) {
- {
- // Set a property that is already marked for removal
- UpdateProperties update(identifier_, catalog_, metadata_);
- update.Set("key1", "value1").Remove("key1");
-
- auto result = update.Apply();
- EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
- EXPECT_THAT(result, HasErrorMessage("already marked for update"));
- }
-
- {
- // Remove a property that is already marked for update
- UpdateProperties update(identifier_, catalog_, metadata_);
- update.Remove("key1").Set("key1", "value1");
-
- auto result = update.Apply();
- EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
- EXPECT_THAT(result, HasErrorMessage("already marked for removal"));
- }
+TEST_F(UpdatePropertiesTest, RemoveThenSetSameKey) {
+ ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateProperties());
+ update->Remove("key1").Set("key1", "value1");
+
+ auto result = update->Apply();
+ EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+ EXPECT_THAT(result, HasErrorMessage("already marked for removal"));
}
-TEST_F(UpdatePropertiesTest, UpgradeFormatVersion) {
- {
- // Valid format-version upgrade
- UpdateProperties update(identifier_, catalog_, metadata_);
- update.Set("format-version", "2");
-
- auto result = update.Apply();
- EXPECT_THAT(result, IsOk());
- }
-
- {
- // Format-version is not a valid integer
- UpdateProperties update(identifier_, catalog_, metadata_);
- update.Set("format-version", "invalid");
-
- auto result = update.Apply();
- EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument));
- EXPECT_THAT(result, HasErrorMessage("Invalid format version"));
- }
-
- {
- // Format-version is out of range
- UpdateProperties update(identifier_, catalog_, metadata_);
- update.Set("format-version", "5000000000");
-
- auto result = update.Apply();
- EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument));
- EXPECT_THAT(result, HasErrorMessage("out of range"));
- }
-
- {
- // Format-version not supported
- UpdateProperties update(identifier_, catalog_, metadata_);
- update.Set("format-version",
- std::to_string(TableMetadata::kSupportedTableFormatVersion +
1));
-
- auto result = update.Apply();
- EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument));
- EXPECT_THAT(result, HasErrorMessage("unsupported format version"));
- }
+TEST_F(UpdatePropertiesTest, UpgradeFormatVersionValid) {
+ ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateProperties());
+ update->Set("format-version", "2");
+
+ ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+ EXPECT_EQ(result.updates.size(), 1);
+ EXPECT_EQ(result.updates[0]->kind(),
+ table::UpgradeFormatVersion::Kind::kUpgradeFormatVersion);
}
-TEST_F(UpdatePropertiesTest, InvalidTable) {
- {
- // catalog is null
- UpdateProperties update(identifier_, nullptr, metadata_);
+TEST_F(UpdatePropertiesTest, UpgradeFormatVersionInvalidString) {
+ ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateProperties());
+ update->Set("format-version", "invalid");
- auto result = update.Apply();
- EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument));
- EXPECT_THAT(result, HasErrorMessage("Catalog is required"));
- }
+ auto result = update->Apply();
+ EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument));
+ EXPECT_THAT(result, HasErrorMessage("Invalid format version"));
+}
- {
- // metadata is null
- UpdateProperties update(identifier_, catalog_, nullptr);
+TEST_F(UpdatePropertiesTest, UpgradeFormatVersionOutOfRange) {
+ ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateProperties());
+ update->Set("format-version", "5000000000");
- auto result = update.Apply();
- EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument));
- EXPECT_THAT(result, HasErrorMessage("Base table metadata is required"));
- }
+ auto result = update->Apply();
+ EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument));
+ EXPECT_THAT(result, HasErrorMessage("out of range"));
}
-TEST_F(UpdatePropertiesTest, Commit) {
- {
- // Successful commit
- UpdateProperties update(identifier_, catalog_, metadata_);
- update.Set("key1", "value1");
+TEST_F(UpdatePropertiesTest, UpgradeFormatVersionUnsupported) {
+ ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateProperties());
+ update->Set("format-version",
+ std::to_string(TableMetadata::kSupportedTableFormatVersion + 1));
- EXPECT_CALL(*catalog_,
UpdateTable).Times(1).WillOnce(::testing::Return(nullptr));
+ auto result = update->Apply();
+ EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument));
+ EXPECT_THAT(result, HasErrorMessage("unsupported format version"));
+}
- auto result = update.Commit();
- EXPECT_THAT(result, IsOk());
- }
+TEST_F(UpdatePropertiesTest, CommitSuccess) {
+ ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateProperties());
+ update->Set("new.property", "new.value");
- {
- // Failed commit
- UpdateProperties update(identifier_, catalog_, metadata_);
- update.Set("key1", "value1");
+ EXPECT_THAT(update->Commit(), IsOk());
- EXPECT_CALL(*catalog_, UpdateTable)
- .WillOnce(::testing::Return(CommitFailed("Commit update failed")));
- auto result = update.Commit();
- EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed));
- }
+ ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+ const auto& props = reloaded->properties().configs();
+ EXPECT_EQ(props.at("new.property"), "new.value");
}
TEST_F(UpdatePropertiesTest, FluentInterface) {
- UpdateProperties update(identifier_, catalog_, metadata_);
+ ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateProperties());
+ auto& ref = update->Set("key1", "value1").Remove("key2");
- auto& ref = update.Set("key1", "value1").Remove("key2");
+ EXPECT_EQ(&ref, update.get());
+ EXPECT_THAT(update->Apply(), IsOk());
+}
- // Should return reference to itself
- EXPECT_EQ(&ref, &update);
+TEST_F(UpdatePropertiesTest, SetAndRemoveDifferentKeys) {
+ ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateProperties());
+ update->Set("key1", "value1").Remove("key2");
+ EXPECT_THAT(update->Commit(), IsOk());
- auto result = update.Apply();
- EXPECT_THAT(result, IsOk());
+ ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+ const auto& props = reloaded->properties().configs();
+ EXPECT_EQ(props.at("key1"), "value1");
+ EXPECT_FALSE(props.contains("key2"));
}
} // namespace iceberg
diff --git a/src/iceberg/test/update_test_base.h
b/src/iceberg/test/update_test_base.h
new file mode 100644
index 00000000..c78dc4d0
--- /dev/null
+++ b/src/iceberg/test/update_test_base.h
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <format>
+#include <memory>
+#include <string>
+
+#include <arrow/filesystem/mockfs.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/catalog/memory/in_memory_catalog.h"
+#include "iceberg/table.h"
+#include "iceberg/table_identifier.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/test/test_resource.h"
+#include "iceberg/util/uuid.h"
+
+namespace iceberg {
+
+// Base test fixture for table update operations
+class UpdateTestBase : public ::testing::Test {
+ protected:
+ void SetUp() override {
+ file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO();
+ catalog_ =
+ InMemoryCatalog::Make("test_catalog", file_io_, "/warehouse/",
/*properties=*/{});
+
+ // Arrow MockFS cannot automatically create directories.
+ auto arrow_fs =
std::dynamic_pointer_cast<::arrow::fs::internal::MockFileSystem>(
+ static_cast<arrow::ArrowFileSystemFileIO&>(*file_io_).fs());
+ ASSERT_TRUE(arrow_fs != nullptr);
+ ASSERT_TRUE(arrow_fs->CreateDir(table_location_ + "/metadata").ok());
+
+ // Write table metadata to the table location.
+ auto metadata_location = std::format("{}/metadata/00001-{}.metadata.json",
+ table_location_,
Uuid::GenerateV7().ToString());
+ ICEBERG_UNWRAP_OR_FAIL(auto metadata,
+
ReadTableMetadataFromResource("TableMetadataV2Valid.json"));
+ metadata->location = table_location_;
+ ASSERT_THAT(TableMetadataUtil::Write(*file_io_, metadata_location,
*metadata),
+ IsOk());
+
+ // Register the table in the catalog.
+ ICEBERG_UNWRAP_OR_FAIL(table_,
+ catalog_->RegisterTable(table_ident_,
metadata_location));
+ }
+
+ const TableIdentifier table_ident_{.name = "test_table"};
+ const std::string table_location_{"/warehouse/test_table"};
+ std::shared_ptr<FileIO> file_io_;
+ std::shared_ptr<InMemoryCatalog> catalog_;
+ std::shared_ptr<Table> table_;
+};
+
+} // namespace iceberg
diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc
new file mode 100644
index 00000000..ca39ec04
--- /dev/null
+++ b/src/iceberg/transaction.cc
@@ -0,0 +1,111 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "iceberg/transaction.h"
+
+#include "iceberg/catalog.h"
+#include "iceberg/table.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/table_requirement.h"
+#include "iceberg/table_requirements.h"
+#include "iceberg/table_update.h"
+#include "iceberg/update/update_properties.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+Transaction::Transaction(std::shared_ptr<Table> table, Kind kind, bool
auto_commit)
+ : table_(std::move(table)),
+ kind_(kind),
+ auto_commit_(auto_commit),
+
metadata_builder_(TableMetadataBuilder::BuildFrom(table_->metadata().get())) {}
+
+Transaction::~Transaction() = default;
+
+Result<std::shared_ptr<Transaction>> Transaction::Make(std::shared_ptr<Table>
table,
+ Kind kind, bool
auto_commit) {
+ if (!table || !table->catalog()) [[unlikely]] {
+ return InvalidArgument("Table and catalog cannot be null");
+ }
+ return std::shared_ptr<Transaction>(
+ new Transaction(std::move(table), kind, auto_commit));
+}
+
+const TableMetadata* Transaction::base() const { return
metadata_builder_->base(); }
+
+const TableMetadata& Transaction::current() const { return
metadata_builder_->current(); }
+
+Status Transaction::AddUpdate(const std::shared_ptr<PendingUpdate>& update) {
+ if (!last_update_committed_) {
+ return InvalidArgument("Cannot add update when previous update is not
committed");
+ }
+ pending_updates_.emplace_back(std::weak_ptr<PendingUpdate>(update));
+ last_update_committed_ = false;
+ return {};
+}
+
+Status Transaction::Apply(std::vector<std::unique_ptr<TableUpdate>> updates) {
+ for (const auto& update : updates) {
+ update->ApplyTo(*metadata_builder_);
+ }
+
+ last_update_committed_ = true;
+
+ if (auto_commit_) {
+ ICEBERG_RETURN_UNEXPECTED(Commit());
+ }
+
+ return {};
+}
+
+Result<std::shared_ptr<Table>> Transaction::Commit() {
+ if (!last_update_committed_) {
+ return InvalidArgument(
+ "Cannot commit transaction when previous update is not committed");
+ }
+
+ const auto& updates = metadata_builder_->changes();
+ if (updates.empty()) {
+ return table_;
+ }
+
+ std::vector<std::unique_ptr<TableRequirement>> requirements;
+ switch (kind_) {
+ case Kind::kCreate: {
+ ICEBERG_ASSIGN_OR_RAISE(requirements,
TableRequirements::ForCreateTable(updates));
+ } break;
+ case Kind::kUpdate: {
+ ICEBERG_ASSIGN_OR_RAISE(requirements, TableRequirements::ForUpdateTable(
+ *metadata_builder_->base(),
updates));
+
+ } break;
+ }
+
+ // XXX: we should handle commit failure and retry here.
+ return table_->catalog()->UpdateTable(table_->name(), requirements, updates);
+}
+
+Result<std::shared_ptr<UpdateProperties>> Transaction::NewUpdateProperties() {
+ ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<UpdateProperties> update_properties,
+ UpdateProperties::Make(shared_from_this()));
+ ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_properties));
+ return update_properties;
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h
index 72ba5182..36328026 100644
--- a/src/iceberg/transaction.h
+++ b/src/iceberg/transaction.h
@@ -21,6 +21,7 @@
#pragma once
#include <memory>
+#include <vector>
#include "iceberg/iceberg_export.h"
#include "iceberg/result.h"
@@ -29,28 +30,61 @@
namespace iceberg {
/// \brief A transaction for performing multiple updates to a table
-class ICEBERG_EXPORT Transaction {
+class ICEBERG_EXPORT Transaction : public
std::enable_shared_from_this<Transaction> {
public:
- virtual ~Transaction() = default;
+ enum class Kind : uint8_t { kCreate, kUpdate };
+
+ ~Transaction();
+
+ /// \brief Create a new transaction
+ static Result<std::shared_ptr<Transaction>> Make(std::shared_ptr<Table>
table,
+ Kind kind, bool
auto_commit);
/// \brief Return the Table that this transaction will update
- ///
- /// \return this transaction's table
- virtual const std::shared_ptr<Table>& table() const = 0;
+ const std::shared_ptr<Table>& table() const { return table_; }
- /// \brief Create a new append API to add files to this table
- ///
- /// \return a new AppendFiles
- virtual std::shared_ptr<AppendFiles> NewAppend() = 0;
+ /// \brief Returns the base metadata without any changes
+ const TableMetadata* base() const;
- /// \brief Apply the pending changes from all actions and commit
- ///
- /// This method applies all pending data operations and metadata updates in
the
- /// transaction and commits them to the table in a single atomic operation.
+ /// \brief Return the current metadata with staged changes applied
+ const TableMetadata& current() const;
+
+ /// \brief Apply the pending changes from all actions and commit.
///
- /// \return Status::OK if the transaction was committed successfully, or an
error
- /// status if validation failed or the commit encountered conflicts
- virtual Status CommitTransaction() = 0;
+ /// \return Updated table if the transaction was committed successfully, or
an error:
+ /// - ValidationFailed: if any update cannot be applied to the current table
metadata.
+ /// - CommitFailed: if the updates cannot be committed due to conflicts.
+ Result<std::shared_ptr<Table>> Commit();
+
+ /// \brief Create a new UpdateProperties to update table properties and
commit the
+ /// changes.
+ Result<std::shared_ptr<UpdateProperties>> NewUpdateProperties();
+
+ private:
+ Transaction(std::shared_ptr<Table> table, Kind kind, bool auto_commit);
+
+ Status AddUpdate(const std::shared_ptr<PendingUpdate>& update);
+
+ /// \brief Apply the pending changes to current table.
+ Status Apply(std::vector<std::unique_ptr<TableUpdate>> updates);
+
+ friend class PendingUpdate; // Need to access the Apply method.
+
+ private:
+ // The table that this transaction will update.
+ std::shared_ptr<Table> table_;
+ // The kind of this transaction.
+ const Kind kind_;
+ // Whether to auto-commit the transaction when updates are applied.
+ // This is useful when a temporary transaction is created for a single
operation.
+ const bool auto_commit_;
+ // To make the state simple, we require updates are added and committed in
order.
+ bool last_update_committed_ = true;
+ // Keep track of all created pending updates. Use weak_ptr to avoid circular
references.
+ // This is useful to retry failed updates.
+ std::vector<std::weak_ptr<PendingUpdate>> pending_updates_;
+ // Accumulated updates from all pending updates.
+ std::unique_ptr<TableMetadataBuilder> metadata_builder_;
};
} // namespace iceberg
diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h
index 0e1867f6..133a7043 100644
--- a/src/iceberg/type_fwd.h
+++ b/src/iceberg/type_fwd.h
@@ -57,6 +57,7 @@ enum class TimeUnit {
kMicrosecond,
};
+/// \brief Data type family.
class BinaryType;
class BooleanType;
class DateType;
@@ -69,12 +70,7 @@ class LongType;
class ListType;
class MapType;
class NestedType;
-class PartitionField;
-class PartitionSpec;
-class PartitionValues;
class PrimitiveType;
-class Schema;
-class SchemaField;
class StringType;
class StructType;
class TimeType;
@@ -84,84 +80,103 @@ class TimestampTzType;
class Type;
class UuidType;
-struct Namespace;
-struct TableIdentifier;
+/// \brief Data values.
+class Decimal;
+class Uuid;
-class Catalog;
-class FileIO;
-class LocationProvider;
+/// \brief Schema.
+class Schema;
+class SchemaField;
+
+/// \brief Partition spec and values.
+class PartitionField;
+class PartitionSpec;
+class PartitionValues;
+
+/// \brief Sort order.
class SortField;
class SortOrder;
-class Table;
-class TableProperties;
-class Transaction;
+
+/// \brief Name mapping.
+struct MappedField;
+class MappedFields;
+class NameMapping;
+
+/// \brief Transform.
+enum class TransformType;
class Transform;
class TransformFunction;
-struct PartitionStatisticsFile;
-struct Snapshot;
-struct SnapshotRef;
+/// \brief Table identifier.
+struct Namespace;
+struct TableIdentifier;
+/// \brief Table metadata.
+enum class SnapshotRefType;
struct MetadataLogEntry;
+struct PartitionStatisticsFile;
+struct Snapshot;
struct SnapshotLogEntry;
-
+struct SnapshotRef;
struct StatisticsFile;
struct TableMetadata;
-struct MappedField;
-class MappedFields;
-class NameMapping;
-
-enum class SnapshotRefType;
-enum class TransformType;
-enum class ManifestContent;
-
-class Decimal;
-class Uuid;
-
+/// \brief Expression.
+class BoundPredicate;
class Expression;
class Literal;
-
-class BoundPredicate;
class UnboundPredicate;
+/// \brief Scan.
class DataTableScan;
class FileScanTask;
class ScanTask;
class TableScan;
class TableScanBuilder;
+/// \brief Manifest.
+enum class ManifestContent;
struct DataFile;
struct ManifestEntry;
struct ManifestFile;
struct ManifestList;
struct PartitionFieldSummary;
-
-class PartitionSummary;
-
class ManifestListReader;
class ManifestListWriter;
class ManifestReader;
class ManifestWriter;
+class PartitionSummary;
+/// \brief File I/O.
struct ReaderOptions;
struct WriterOptions;
+class FileIO;
class Reader;
class Writer;
+/// \brief Row-based data structures.
class ArrayLike;
class MapLike;
class StructLike;
class StructLikeAccessor;
+/// \brief Catalog
+class Catalog;
+class LocationProvider;
+
+/// \brief Table.
+class Table;
+class TableProperties;
+
+/// \brief Table update.
+class TableMetadataBuilder;
class TableUpdate;
class TableRequirement;
-class TableMetadataBuilder;
class TableUpdateContext;
+class Transaction;
+/// \brief Update family.
class PendingUpdate;
-template <typename T>
-class PendingUpdateTyped;
class UpdateProperties;
///
----------------------------------------------------------------------------
diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build
new file mode 100644
index 00000000..38502b14
--- /dev/null
+++ b/src/iceberg/update/meson.build
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+install_headers(
+ ['pending_update.h', 'update_properties.h'],
+ subdir: 'iceberg/update',
+)
diff --git a/src/iceberg/update/pending_update.cc
b/src/iceberg/update/pending_update.cc
new file mode 100644
index 00000000..4dbc6788
--- /dev/null
+++ b/src/iceberg/update/pending_update.cc
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/pending_update.h"
+
+#include "iceberg/table_update.h"
+#include "iceberg/transaction.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+PendingUpdate::PendingUpdate(std::shared_ptr<Transaction> transaction)
+ : transaction_(std::move(transaction)) {}
+
+PendingUpdate::~PendingUpdate() = default;
+
+Status PendingUpdate::Commit() {
+ ICEBERG_ASSIGN_OR_RAISE(auto apply_result, Apply());
+ return transaction_->Apply(std::move(apply_result.updates));
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/update/pending_update.h
b/src/iceberg/update/pending_update.h
new file mode 100644
index 00000000..c4618400
--- /dev/null
+++ b/src/iceberg/update/pending_update.h
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/update/pending_update.h
+/// API for table changes using builder pattern
+
+#include <memory>
+#include <vector>
+
+#include "iceberg/iceberg_export.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+#include "iceberg/util/error_collector.h"
+
+namespace iceberg {
+
+/// \brief Base class for all kinds of table metadata updates.
+///
+/// Any created `PendingUpdate` instance is tracked by the `Transaction`
instance
+/// and commit is also delegated to the `Transaction` instance.
+///
+/// \note Implementations are expected to use builder pattern and errors
+/// should be handled by the ErrorCollector base class.
+class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
+ public:
+ enum class Kind : uint8_t {
+ kUpdateProperties,
+ };
+
+ /// \brief Return the kind of this pending update.
+ virtual Kind kind() const = 0;
+
+ struct ApplyResult {
+ std::vector<std::unique_ptr<TableUpdate>> updates;
+ };
+
+ /// \brief Apply the pending changes and return the uncommitted changes for
validation.
+ ///
+ /// \note This does not result in a permanent update.
+ /// \return The uncommitted changes that would be committed by calling
Commit(), or an
+ /// error:
+ /// - ValidationFailed: the pending changes cannot be applied to the
current
+ /// metadata
+ /// - InvalidArgument: if pending changes are conflicting or invalid
+ virtual Result<ApplyResult> Apply() = 0;
+
+ /// \brief Apply the pending changes and commit.
+ ///
+ /// \return An OK status if the commit was successful, or an error:
+ /// - ValidationFailed: if it cannot be applied to the current table
metadata.
+ /// - CommitFailed: if it cannot be committed due to conflicts.
+ /// - CommitStateUnknown: unknown status, no cleanup should be done.
+ virtual Status Commit();
+
+ // Non-copyable, movable
+ PendingUpdate(const PendingUpdate&) = delete;
+ PendingUpdate& operator=(const PendingUpdate&) = delete;
+ PendingUpdate(PendingUpdate&&) noexcept = default;
+ PendingUpdate& operator=(PendingUpdate&&) noexcept = default;
+
+ ~PendingUpdate() override;
+
+ protected:
+ explicit PendingUpdate(std::shared_ptr<Transaction> transaction);
+
+ std::shared_ptr<Transaction> transaction_;
+};
+
+} // namespace iceberg
diff --git a/src/iceberg/update/update_properties.cc
b/src/iceberg/update/update_properties.cc
index a4dcd154..e502848c 100644
--- a/src/iceberg/update/update_properties.cc
+++ b/src/iceberg/update/update_properties.cc
@@ -19,28 +19,33 @@
#include "iceberg/update/update_properties.h"
+#include <charconv>
#include <cstdint>
#include <memory>
+#include <system_error>
-#include "iceberg/catalog.h"
#include "iceberg/metrics_config.h"
#include "iceberg/result.h"
-#include "iceberg/table.h"
-#include "iceberg/table_identifier.h"
#include "iceberg/table_metadata.h"
#include "iceberg/table_properties.h"
-#include "iceberg/table_requirements.h"
#include "iceberg/table_update.h"
+#include "iceberg/transaction.h"
#include "iceberg/util/macros.h"
namespace iceberg {
-UpdateProperties::UpdateProperties(TableIdentifier identifier,
- std::shared_ptr<Catalog> catalog,
- std::shared_ptr<TableMetadata> base)
- : identifier_(std::move(identifier)),
- catalog_(std::move(catalog)),
- base_metadata_(std::move(base)) {}
+Result<std::shared_ptr<UpdateProperties>> UpdateProperties::Make(
+ std::shared_ptr<Transaction> transaction) {
+ if (!transaction) [[unlikely]] {
+ return InvalidArgument("Cannot create UpdateProperties without a
transaction");
+ }
+ return std::shared_ptr<UpdateProperties>(new
UpdateProperties(std::move(transaction)));
+}
+
+UpdateProperties::UpdateProperties(std::shared_ptr<Transaction> transaction)
+ : PendingUpdate(std::move(transaction)) {}
+
+UpdateProperties::~UpdateProperties() = default;
UpdateProperties& UpdateProperties::Set(const std::string& key,
const std::string& value) {
@@ -70,65 +75,50 @@ UpdateProperties& UpdateProperties::Remove(const
std::string& key) {
return *this;
}
-Status UpdateProperties::Apply() {
- if (!catalog_) {
- return InvalidArgument("Catalog is required to apply property updates");
- }
- if (!base_metadata_) {
- return InvalidArgument("Base table metadata is required to apply property
updates");
- }
-
+Result<PendingUpdate::ApplyResult> UpdateProperties::Apply() {
ICEBERG_RETURN_UNEXPECTED(CheckErrors());
auto iter = updates_.find(TableProperties::kFormatVersion.key());
if (iter != updates_.end()) {
- try {
- int parsed_version = std::stoi(iter->second);
- if (parsed_version > TableMetadata::kSupportedTableFormatVersion) {
- return InvalidArgument(
- "Cannot upgrade table to unsupported format version: v{}
(supported: v{})",
- parsed_version, TableMetadata::kSupportedTableFormatVersion);
- }
- format_version_ = static_cast<int8_t>(parsed_version);
- } catch (const std::invalid_argument& e) {
- return InvalidArgument("Invalid format version '{}': not a valid
integer",
- iter->second);
- } catch (const std::out_of_range& e) {
- return InvalidArgument("Format version '{}' is out of range",
iter->second);
+ int parsed_version = 0;
+ const auto& val = iter->second;
+ auto [ptr, ec] = std::from_chars(val.data(), val.data() + val.size(),
parsed_version);
+
+ if (ec == std::errc::invalid_argument) {
+ return InvalidArgument("Invalid format version '{}': not a valid
integer", val);
+ } else if (ec == std::errc::result_out_of_range) {
+ return InvalidArgument("Format version '{}' is out of range", val);
+ }
+
+ if (parsed_version > TableMetadata::kSupportedTableFormatVersion) {
+ return InvalidArgument(
+ "Cannot upgrade table to unsupported format version: v{} (supported:
v{})",
+ parsed_version, TableMetadata::kSupportedTableFormatVersion);
}
+ format_version_ = static_cast<int8_t>(parsed_version);
updates_.erase(iter);
}
- if (auto schema = base_metadata_->Schema(); schema.has_value()) {
+ if (auto schema = transaction_->current().Schema(); schema.has_value()) {
ICEBERG_RETURN_UNEXPECTED(
MetricsConfig::VerifyReferencedColumns(updates_, *schema.value()));
}
- return {};
-}
-
-Status UpdateProperties::Commit() {
- ICEBERG_RETURN_UNEXPECTED(Apply());
- std::vector<std::unique_ptr<TableUpdate>> updates;
+ ApplyResult result;
if (!updates_.empty()) {
-
updates.emplace_back(std::make_unique<table::SetProperties>(std::move(updates_)));
+
result.updates.emplace_back(std::make_unique<table::SetProperties>(updates_));
}
if (!removals_.empty()) {
- updates.emplace_back(std::make_unique<table::RemoveProperties>(
+ result.updates.emplace_back(std::make_unique<table::RemoveProperties>(
std::vector<std::string>{removals_.begin(), removals_.end()}));
}
if (format_version_.has_value()) {
- updates.emplace_back(
+ result.updates.emplace_back(
std::make_unique<table::UpgradeFormatVersion>(format_version_.value()));
};
- if (!updates.empty()) {
- ICEBERG_ASSIGN_OR_RAISE(auto requirements,
- TableRequirements::ForUpdateTable(*base_metadata_,
updates));
- ICEBERG_RETURN_UNEXPECTED(catalog_->UpdateTable(identifier_, requirements,
updates));
- }
- return {};
+ return result;
}
} // namespace iceberg
diff --git a/src/iceberg/update/update_properties.h
b/src/iceberg/update/update_properties.h
index 0f1adf76..fc8f46f1 100644
--- a/src/iceberg/update/update_properties.h
+++ b/src/iceberg/update/update_properties.h
@@ -20,28 +20,24 @@
#pragma once
#include <memory>
+#include <optional>
#include <string>
#include <unordered_map>
#include <unordered_set>
-#include "iceberg/file_format.h"
#include "iceberg/iceberg_export.h"
-#include "iceberg/pending_update.h"
-#include "iceberg/table_identifier.h"
#include "iceberg/type_fwd.h"
+#include "iceberg/update/pending_update.h"
namespace iceberg {
/// \brief Updates table properties.
class ICEBERG_EXPORT UpdateProperties : public PendingUpdate {
public:
- /// \brief Constructs a UpdateProperties for the specified table.
- ///
- /// \param identifier The table identifier
- /// \param catalog The catalog containing the table
- /// \param metadata The current table metadata
- UpdateProperties(TableIdentifier identifier, std::shared_ptr<Catalog>
catalog,
- std::shared_ptr<TableMetadata> base);
+ static Result<std::shared_ptr<UpdateProperties>> Make(
+ std::shared_ptr<Transaction> transaction);
+
+ ~UpdateProperties() override;
/// \brief Sets a property key to a specified value.
///
@@ -57,25 +53,12 @@ class ICEBERG_EXPORT UpdateProperties : public
PendingUpdate {
/// \return Reference to this UpdateProperties for chaining
UpdateProperties& Remove(const std::string& key);
- /// \brief Applies the property changes without committing them.
- ///
- /// Validates the pending property changes but does not commit them to the
table.
- /// This method can be used to validate changes before actually committing
them.
- ///
- /// \return Status::OK if the changes are valid, or an error if validation
fails
- Status Apply() override;
+ Kind kind() const final { return Kind::kUpdateProperties; }
- /// \brief Commits the property changes to the table.
- ///
- /// Validates the changes and applies them to the table through the catalog.
- ///
- /// \return OK if the changes are valid and committed successfully, or an
error
- Status Commit() override;
+ Result<ApplyResult> Apply() final;
private:
- TableIdentifier identifier_;
- std::shared_ptr<Catalog> catalog_;
- std::shared_ptr<TableMetadata> base_metadata_;
+ explicit UpdateProperties(std::shared_ptr<Transaction> transaction);
std::unordered_map<std::string, std::string> updates_;
std::unordered_set<std::string> removals_;