This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new fea93b1a feat: implement create (stage) table for InMemoryCatalog
(#416)
fea93b1a is described below
commit fea93b1ac7eeaafbc0a2e2e52f9679c65323424b
Author: wzhuo <[email protected]>
AuthorDate: Tue Dec 30 13:43:21 2025 +0800
feat: implement create (stage) table for InMemoryCatalog (#416)
---
src/iceberg/CMakeLists.txt | 2 +
src/iceberg/catalog/memory/in_memory_catalog.cc | 75 +++++++--
src/iceberg/meson.build | 2 +
src/iceberg/schema.cc | 2 +-
.../{table_properties.cc => table_identifier.cc} | 27 ++-
src/iceberg/table_identifier.h | 11 ++
src/iceberg/table_metadata.cc | 181 ++++++++++++++++++++-
src/iceberg/table_metadata.h | 15 +-
src/iceberg/table_properties.cc | 7 +
src/iceberg/table_properties.h | 3 +
src/iceberg/table_requirement.h | 32 ++--
src/iceberg/table_requirements.cc | 17 ++
src/iceberg/table_requirements.h | 4 +
src/iceberg/table_update.cc | 2 +-
src/iceberg/test/formatter_test.cc | 16 ++
src/iceberg/test/in_memory_catalog_test.cc | 68 ++++++++
src/iceberg/test/table_metadata_builder_test.cc | 143 ++++++++++++++++
src/iceberg/test/table_requirement_test.cc | 16 +-
src/iceberg/test/table_requirements_test.cc | 14 ++
src/iceberg/transaction.cc | 16 +-
src/iceberg/transaction.h | 3 +-
src/iceberg/util/meson.build | 1 +
src/iceberg/util/property_util.cc | 52 ++++++
.../{table_identifier.h => util/property_util.h} | 32 +---
24 files changed, 654 insertions(+), 87 deletions(-)
diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index 27892935..ce339cfe 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -66,6 +66,7 @@ set(ICEBERG_SOURCES
sort_order.cc
statistics_file.cc
table.cc
+ table_identifier.cc
table_metadata.cc
table_properties.cc
table_requirement.cc
@@ -86,6 +87,7 @@ set(ICEBERG_SOURCES
util/decimal.cc
util/gzip_internal.cc
util/murmurhash3_internal.cc
+ util/property_util.cc
util/snapshot_util.cc
util/temporal_util.cc
util/timepoint.cc
diff --git a/src/iceberg/catalog/memory/in_memory_catalog.cc
b/src/iceberg/catalog/memory/in_memory_catalog.cc
index b3fd0060..5e562765 100644
--- a/src/iceberg/catalog/memory/in_memory_catalog.cc
+++ b/src/iceberg/catalog/memory/in_memory_catalog.cc
@@ -26,7 +26,10 @@
#include "iceberg/table_identifier.h"
#include "iceberg/table_metadata.h"
#include "iceberg/table_requirement.h"
+#include "iceberg/table_requirements.h"
#include "iceberg/table_update.h"
+#include "iceberg/transaction.h"
+#include "iceberg/util/checked_cast.h"
#include "iceberg/util/macros.h"
namespace iceberg {
@@ -318,7 +321,7 @@ Result<std::string>
InMemoryNamespace::GetTableMetadataLocation(
ICEBERG_RETURN_UNEXPECTED(ns);
const auto it = ns.value()->table_metadata_locations_.find(table_ident.name);
if (it == ns.value()->table_metadata_locations_.end()) {
- return NotFound("{} does not exist", table_ident.name);
+ return NotFound("Table does not exist: {}", table_ident);
}
return it->second;
}
@@ -405,7 +408,23 @@ Result<std::shared_ptr<Table>>
InMemoryCatalog::CreateTable(
const std::string& location,
const std::unordered_map<std::string, std::string>& properties) {
std::unique_lock lock(mutex_);
- return NotImplemented("create table");
+ if (root_namespace_->TableExists(identifier).value_or(false)) {
+ return AlreadyExists("Table already exists: {}", identifier);
+ }
+
+ std::string base_location =
+ location.empty() ? warehouse_location_ + "/" + identifier.ToString() :
location;
+
+ ICEBERG_ASSIGN_OR_RAISE(auto table_metadata, TableMetadata::Make(*schema,
*spec, *order,
+ location,
properties));
+
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto metadata_file_location,
+ TableMetadataUtil::Write(*file_io_, nullptr, "", *table_metadata));
+ ICEBERG_RETURN_UNEXPECTED(
+ root_namespace_->UpdateTableMetadataLocation(identifier,
metadata_file_location));
+ return Table::Make(identifier, std::move(table_metadata),
+ std::move(metadata_file_location), file_io_,
shared_from_this());
}
Result<std::shared_ptr<Table>> InMemoryCatalog::UpdateTable(
@@ -413,24 +432,44 @@ Result<std::shared_ptr<Table>>
InMemoryCatalog::UpdateTable(
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
const std::vector<std::unique_ptr<TableUpdate>>& updates) {
std::unique_lock lock(mutex_);
- ICEBERG_ASSIGN_OR_RAISE(auto base_metadata_location,
-
root_namespace_->GetTableMetadataLocation(identifier));
-
- ICEBERG_ASSIGN_OR_RAISE(auto base,
- TableMetadataUtil::Read(*file_io_,
base_metadata_location));
+ auto base_metadata_location =
root_namespace_->GetTableMetadataLocation(identifier);
+ std::unique_ptr<TableMetadata> base;
+ std::unique_ptr<TableMetadataBuilder> builder;
+ ICEBERG_ASSIGN_OR_RAISE(auto is_create,
TableRequirements::IsCreate(requirements));
+ if (is_create) {
+ if (base_metadata_location.has_value()) {
+ return AlreadyExists("Table already exists: {}", identifier);
+ }
+ int8_t format_version = TableMetadata::kDefaultTableFormatVersion;
+ for (const auto& update : updates) {
+ if (update->kind() == TableUpdate::Kind::kUpgradeFormatVersion) {
+ format_version =
+ internal::checked_cast<const table::UpgradeFormatVersion&>(*update)
+ .format_version();
+ }
+ }
+ builder = TableMetadataBuilder::BuildFromEmpty(format_version);
+ } else {
+ ICEBERG_RETURN_UNEXPECTED(base_metadata_location);
+ ICEBERG_ASSIGN_OR_RAISE(
+ base, TableMetadataUtil::Read(*file_io_,
base_metadata_location.value()));
+ builder = TableMetadataBuilder::BuildFrom(base.get());
+ }
for (const auto& requirement : requirements) {
ICEBERG_RETURN_UNEXPECTED(requirement->Validate(base.get()));
}
- auto builder = TableMetadataBuilder::BuildFrom(base.get());
for (const auto& update : updates) {
update->ApplyTo(*builder);
}
ICEBERG_ASSIGN_OR_RAISE(auto updated, builder->Build());
ICEBERG_ASSIGN_OR_RAISE(
auto new_metadata_location,
- TableMetadataUtil::Write(*file_io_, base.get(), base_metadata_location,
*updated));
+ TableMetadataUtil::Write(
+ *file_io_, base.get(),
+ base_metadata_location.has_value() ? base_metadata_location.value()
: "",
+ *updated));
ICEBERG_RETURN_UNEXPECTED(
root_namespace_->UpdateTableMetadataLocation(identifier,
new_metadata_location));
TableMetadataUtil::DeleteRemovedMetadataFiles(*file_io_, base.get(),
*updated);
@@ -445,7 +484,21 @@ Result<std::shared_ptr<Transaction>>
InMemoryCatalog::StageCreateTable(
const std::string& location,
const std::unordered_map<std::string, std::string>& properties) {
std::unique_lock lock(mutex_);
- return NotImplemented("stage create table");
+ if (root_namespace_->TableExists(identifier).value_or(false)) {
+ return AlreadyExists("Table already exists: {}", identifier);
+ }
+
+ std::string base_location =
+ location.empty() ? warehouse_location_ + "/" + identifier.ToString() :
location;
+
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto table_metadata,
+ TableMetadata::Make(*schema, *spec, *order, base_location, properties));
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto table, StagedTable::Make(identifier, std::move(table_metadata), "",
file_io_,
+ shared_from_this()));
+ return Transaction::Make(std::move(table), Transaction::Kind::kCreate,
+ /* auto_commit */ false);
}
Result<bool> InMemoryCatalog::TableExists(const TableIdentifier& identifier)
const {
@@ -495,7 +548,7 @@ Result<std::shared_ptr<Table>>
InMemoryCatalog::RegisterTable(
std::unique_lock lock(mutex_);
if (!root_namespace_->NamespaceExists(identifier.ns)) {
- return NoSuchNamespace("table namespace does not exist.");
+ return NoSuchNamespace("Table namespace does not exist: {}",
identifier.ns);
}
if (!root_namespace_->RegisterTable(identifier, metadata_file_location)) {
return UnknownError("The registry failed.");
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index f8022b09..77d72638 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -88,6 +88,7 @@ iceberg_sources = files(
'sort_order.cc',
'statistics_file.cc',
'table.cc',
+ 'table_identifier.cc',
'table_metadata.cc',
'table_properties.cc',
'table_requirement.cc',
@@ -108,6 +109,7 @@ iceberg_sources = files(
'util/decimal.cc',
'util/gzip_internal.cc',
'util/murmurhash3_internal.cc',
+ 'util/property_util.cc',
'util/snapshot_util.cc',
'util/temporal_util.cc',
'util/timepoint.cc',
diff --git a/src/iceberg/schema.cc b/src/iceberg/schema.cc
index 0b7336b6..afe6be1e 100644
--- a/src/iceberg/schema.cc
+++ b/src/iceberg/schema.cc
@@ -235,7 +235,7 @@ Result<std::vector<std::string>>
Schema::IdentifierFieldNames() const {
for (auto id : identifier_field_ids_) {
ICEBERG_ASSIGN_OR_RAISE(auto name, FindColumnNameById(id));
if (!name.has_value()) {
- return InvalidSchema("Cannot find the field of the specified field id:
{}", id);
+ return InvalidSchema("Cannot find identifier field id: {}", id);
}
names.emplace_back(name.value());
}
diff --git a/src/iceberg/table_properties.cc b/src/iceberg/table_identifier.cc
similarity index 52%
copy from src/iceberg/table_properties.cc
copy to src/iceberg/table_identifier.cc
index 96633bc2..9b92d549 100644
--- a/src/iceberg/table_properties.cc
+++ b/src/iceberg/table_identifier.cc
@@ -17,27 +17,20 @@
* under the License.
*/
-#include "iceberg/table_properties.h"
+#include "iceberg/table_identifier.h"
-namespace iceberg {
+#include "iceberg/util/formatter_internal.h"
-const std::unordered_set<std::string>& TableProperties::reserved_properties() {
- static const std::unordered_set<std::string> kReservedProperties = {
- kFormatVersion.key(), kUuid.key(),
- kSnapshotCount.key(), kCurrentSnapshotId.key(),
- kCurrentSnapshotSummary.key(), kCurrentSnapshotTimestamp.key(),
- kCurrentSchema.key(), kDefaultPartitionSpec.key(),
- kDefaultSortOrder.key()};
- return kReservedProperties;
-}
+namespace iceberg {
-TableProperties TableProperties::default_properties() { return {}; }
+std::string Namespace::ToString() const { return FormatRange(levels, ".", "",
""); }
-TableProperties TableProperties::FromMap(
- std::unordered_map<std::string, std::string> properties) {
- TableProperties table_properties;
- table_properties.configs_ = std::move(properties);
- return table_properties;
+std::string TableIdentifier::ToString() const {
+ if (!ns.levels.empty()) {
+ return std::format("{}.{}", ns.ToString(), name);
+ } else {
+ return name;
+ }
}
} // namespace iceberg
diff --git a/src/iceberg/table_identifier.h b/src/iceberg/table_identifier.h
index bef9b81d..77a2ab9c 100644
--- a/src/iceberg/table_identifier.h
+++ b/src/iceberg/table_identifier.h
@@ -27,6 +27,7 @@
#include "iceberg/iceberg_export.h"
#include "iceberg/result.h"
+#include "iceberg/util/formatter.h" // IWYU pragma: keep
namespace iceberg {
@@ -35,8 +36,12 @@ struct ICEBERG_EXPORT Namespace {
std::vector<std::string> levels;
bool operator==(const Namespace& other) const { return levels ==
other.levels; }
+
+ std::string ToString() const;
};
+ICEBERG_EXPORT inline std::string ToString(const Namespace& ns) { return
ns.ToString(); }
+
/// \brief Identifies a table in iceberg catalog.
struct ICEBERG_EXPORT TableIdentifier {
Namespace ns;
@@ -53,6 +58,12 @@ struct ICEBERG_EXPORT TableIdentifier {
}
return {};
}
+
+ std::string ToString() const;
};
+ICEBERG_EXPORT inline std::string ToString(const TableIdentifier& ident) {
+ return ident.ToString();
+}
+
} // namespace iceberg
diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc
index b6cb5f63..095f91d1 100644
--- a/src/iceberg/table_metadata.cc
+++ b/src/iceberg/table_metadata.cc
@@ -37,6 +37,7 @@
#include "iceberg/exception.h"
#include "iceberg/file_io.h"
#include "iceberg/json_internal.h"
+#include "iceberg/metrics_config.h"
#include "iceberg/partition_field.h"
#include "iceberg/partition_spec.h"
#include "iceberg/result.h"
@@ -50,12 +51,130 @@
#include "iceberg/util/gzip_internal.h"
#include "iceberg/util/location_util.h"
#include "iceberg/util/macros.h"
+#include "iceberg/util/property_util.h"
+#include "iceberg/util/type_util.h"
#include "iceberg/util/uuid.h"
+
namespace iceberg {
namespace {
const TimePointMs kInvalidLastUpdatedMs = TimePointMs::min();
constexpr int32_t kLastAdded = -1;
constexpr std::string_view kMetadataFolderName = "metadata";
+
+// TableMetadata private static methods
+Result<std::unique_ptr<PartitionSpec>> FreshPartitionSpec(int32_t spec_id,
+ const PartitionSpec&
spec,
+ const Schema&
base_schema,
+ const Schema&
fresh_schema) {
+ std::vector<PartitionField> partition_fields;
+ partition_fields.reserve(spec.fields().size());
+ int32_t last_partition_field_id = PartitionSpec::kInvalidPartitionFieldId;
+ for (auto& field : spec.fields()) {
+ ICEBERG_ASSIGN_OR_RAISE(auto source_name,
+ base_schema.FindColumnNameById(field.source_id()));
+ if (!source_name.has_value()) [[unlikely]] {
+ return InvalidSchema(
+ "Cannot find source partition field with ID {} in the old schema",
+ field.source_id());
+ }
+ ICEBERG_ASSIGN_OR_RAISE(auto fresh_field,
+ fresh_schema.FindFieldByName(source_name.value()));
+ if (!fresh_field.has_value()) [[unlikely]] {
+ return InvalidSchema("Partition field {} does not exist in the schema",
+ source_name.value());
+ }
+ partition_fields.emplace_back(fresh_field.value().get().field_id(),
+ ++last_partition_field_id,
std::string(field.name()),
+ field.transform());
+ }
+ return PartitionSpec::Make(fresh_schema, spec_id,
std::move(partition_fields), false);
+}
+
+Result<std::shared_ptr<SortOrder>> FreshSortOrder(int32_t order_id,
+ const SortOrder& order,
+ const Schema& base_schema,
+ const Schema& fresh_schema) {
+ if (order.is_unsorted()) {
+ return SortOrder::Unsorted();
+ }
+
+ std::vector<SortField> sort_fields;
+ sort_fields.reserve(order.fields().size());
+ for (const auto& field : order.fields()) {
+ ICEBERG_ASSIGN_OR_RAISE(auto source_name,
+ base_schema.FindColumnNameById(field.source_id()));
+ if (!source_name.has_value()) {
+ return InvalidSchema("Cannot find source sort field with ID {} in the
old schema",
+ field.source_id());
+ }
+ ICEBERG_ASSIGN_OR_RAISE(auto fresh_field,
+ fresh_schema.FindFieldByName(source_name.value()));
+ if (!fresh_field.has_value()) {
+ return InvalidSchema("Cannot find field '{}' in the new schema",
+ source_name.value());
+ }
+ sort_fields.emplace_back(fresh_field.value().get().field_id(),
field.transform(),
+ field.direction(), field.null_order());
+ }
+ return SortOrder::Make(order_id, std::move(sort_fields));
+}
+
+std::vector<std::unique_ptr<TableUpdate>> ChangesForCreate(
+ const TableMetadata& metadata) {
+ std::vector<std::unique_ptr<TableUpdate>> changes;
+
+ // Add UUID assignment
+ changes.push_back(std::make_unique<table::AssignUUID>(metadata.table_uuid));
+
+ // Add format version upgrade
+ changes.push_back(
+ std::make_unique<table::UpgradeFormatVersion>(metadata.format_version));
+
+ // Add schema
+ if (auto current_schema_result = metadata.Schema();
current_schema_result.has_value()) {
+ auto current_schema = current_schema_result.value();
+ changes.push_back(
+ std::make_unique<table::AddSchema>(current_schema,
metadata.last_column_id));
+ changes.push_back(std::make_unique<table::SetCurrentSchema>(kLastAdded));
+ }
+
+ // Add partition spec
+ if (auto partition_spec_result = metadata.PartitionSpec();
+ partition_spec_result.has_value()) {
+ auto spec = partition_spec_result.value();
+ if (spec && spec->spec_id() != PartitionSpec::kInitialSpecId) {
+ changes.push_back(std::make_unique<table::AddPartitionSpec>(spec));
+ } else {
+ changes.push_back(
+
std::make_unique<table::AddPartitionSpec>(PartitionSpec::Unpartitioned()));
+ }
+
changes.push_back(std::make_unique<table::SetDefaultPartitionSpec>(kLastAdded));
+ }
+
+ // Add sort order
+ if (auto sort_order_result = metadata.SortOrder();
sort_order_result.has_value()) {
+ auto order = sort_order_result.value();
+ if (order && order->is_sorted()) {
+ changes.push_back(std::make_unique<table::AddSortOrder>(order));
+ } else {
+
changes.push_back(std::make_unique<table::AddSortOrder>(SortOrder::Unsorted()));
+ }
+
changes.push_back(std::make_unique<table::SetDefaultSortOrder>(kLastAdded));
+ }
+
+ // Set location if not empty
+ if (!metadata.location.empty()) {
+ changes.push_back(std::make_unique<table::SetLocation>(metadata.location));
+ }
+
+ // Set properties if not empty
+ if (!metadata.properties.configs().empty()) {
+ changes.push_back(
+ std::make_unique<table::SetProperties>(metadata.properties.configs()));
+ }
+
+ return changes;
+}
} // namespace
std::string ToString(const SnapshotLogEntry& entry) {
@@ -68,6 +187,48 @@ std::string ToString(const MetadataLogEntry& entry) {
entry.metadata_file);
}
+Result<std::unique_ptr<TableMetadata>> TableMetadata::Make(
+ const iceberg::Schema& schema, const iceberg::PartitionSpec& spec,
+ const iceberg::SortOrder& sort_order, const std::string& location,
+ const std::unordered_map<std::string, std::string>& properties, int
format_version) {
+ for (const auto& [key, _] : properties) {
+ if (TableProperties::reserved_properties().contains(key)) {
+ return InvalidArgument(
+ "Table properties should not contain reserved properties, but got
{}", key);
+ }
+ }
+
+ // Reassign all column ids to ensure consistency
+ int32_t last_column_id = 0;
+ auto next_id = [&last_column_id]() -> int32_t { return ++last_column_id; };
+ ICEBERG_ASSIGN_OR_RAISE(auto fresh_schema,
+ AssignFreshIds(Schema::kInitialSchemaId, schema,
next_id));
+
+ // Rebuild the partition spec using the new column ids
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto fresh_spec,
+ FreshPartitionSpec(PartitionSpec::kInitialSpecId, spec, schema,
*fresh_schema));
+
+ // rebuild the sort order using the new column ids
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto fresh_order,
+ FreshSortOrder(SortOrder::kInitialSortOrderId, sort_order, schema,
*fresh_schema))
+
+ // Validata the metrics configuration.
+ ICEBERG_RETURN_UNEXPECTED(
+ MetricsConfig::VerifyReferencedColumns(properties, *fresh_schema));
+
+
ICEBERG_RETURN_UNEXPECTED(PropertyUtil::ValidateCommitProperties(properties));
+
+ return TableMetadataBuilder::BuildFromEmpty(format_version)
+ ->SetLocation(location)
+ .SetCurrentSchema(std::move(fresh_schema), last_column_id)
+ .SetDefaultPartitionSpec(std::move(fresh_spec))
+ .SetDefaultSortOrder(std::move(fresh_order))
+ .SetProperties(properties)
+ .Build();
+}
+
Result<std::shared_ptr<Schema>> TableMetadata::Schema() const {
return SchemaById(current_schema_id);
}
@@ -435,6 +596,7 @@ class TableMetadataBuilder::Impl {
Status SetCurrentSchema(int32_t schema_id);
Status RemoveSchemas(const std::unordered_set<int32_t>& schema_ids);
Result<int32_t> AddSchema(const Schema& schema, int32_t new_last_column_id);
+ void SetLocation(std::string_view location);
Result<std::unique_ptr<TableMetadata>> Build();
@@ -819,6 +981,14 @@ Result<int32_t>
TableMetadataBuilder::Impl::AddSchema(const Schema& schema,
return new_schema_id;
}
+void TableMetadataBuilder::Impl::SetLocation(std::string_view location) {
+ if (location == metadata_.location) {
+ return;
+ }
+ metadata_.location = std::string(location);
+
changes_.push_back(std::make_unique<table::SetLocation>(std::string(location)));
+}
+
Result<std::unique_ptr<TableMetadata>> TableMetadataBuilder::Impl::Build() {
// 1. Validate metadata consistency through TableMetadata#Validate
@@ -938,6 +1108,14 @@ std::unique_ptr<TableMetadataBuilder>
TableMetadataBuilder::BuildFrom(
return std::unique_ptr<TableMetadataBuilder>(new
TableMetadataBuilder(base)); // NOLINT
}
+TableMetadataBuilder& TableMetadataBuilder::ApplyChangesForCreate(
+ const TableMetadata& base) {
+ for (auto& change : ChangesForCreate(base)) {
+ change->ApplyTo(*this);
+ }
+ return *this;
+}
+
TableMetadataBuilder& TableMetadataBuilder::SetMetadataLocation(
std::string_view metadata_location) {
impl_->SetMetadataLocation(metadata_location);
@@ -1099,7 +1277,8 @@ TableMetadataBuilder&
TableMetadataBuilder::RemoveProperties(
}
TableMetadataBuilder& TableMetadataBuilder::SetLocation(std::string_view
location) {
- throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+ impl_->SetLocation(location);
+ return *this;
}
TableMetadataBuilder& TableMetadataBuilder::AddEncryptionKey(
diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h
index 3885a458..068ab319 100644
--- a/src/iceberg/table_metadata.h
+++ b/src/iceberg/table_metadata.h
@@ -127,6 +127,12 @@ struct ICEBERG_EXPORT TableMetadata {
/// A `long` higher than all assigned row IDs
int64_t next_row_id;
+ static Result<std::unique_ptr<TableMetadata>> Make(
+ const iceberg::Schema& schema, const iceberg::PartitionSpec& spec,
+ const iceberg::SortOrder& sort_order, const std::string& location,
+ const std::unordered_map<std::string, std::string>& properties,
+ int format_version = kDefaultTableFormatVersion);
+
/// \brief Get the current schema, return NotFoundError if not found
Result<std::shared_ptr<iceberg::Schema>> Schema() const;
/// \brief Get the current schema by ID, return NotFoundError if not found
@@ -210,9 +216,16 @@ class ICEBERG_EXPORT TableMetadataBuilder : public
ErrorCollector {
/// \brief Create a builder from existing table metadata
///
/// \param base The base table metadata to build from
- /// \return A new TableMetadataBuilder instance initialized with base
metadata
+ /// \return A new TableMetadataBuilder instance initialized
+ /// with base metadata
static std::unique_ptr<TableMetadataBuilder> BuildFrom(const TableMetadata*
base);
+ /// \brief Apply changes required to create this table metadata
+ ///
+ /// \param base The table metadata to build from
+ /// \return Reference to this builder for method chaining
+ TableMetadataBuilder& ApplyChangesForCreate(const TableMetadata& base);
+
/// \brief Set the metadata location of the table
///
/// \param metadata_location The new metadata location
diff --git a/src/iceberg/table_properties.cc b/src/iceberg/table_properties.cc
index 96633bc2..db6adedc 100644
--- a/src/iceberg/table_properties.cc
+++ b/src/iceberg/table_properties.cc
@@ -31,6 +31,13 @@ const std::unordered_set<std::string>&
TableProperties::reserved_properties() {
return kReservedProperties;
}
+const std::unordered_set<std::string>& TableProperties::commit_properties() {
+ static const std::unordered_set<std::string> kCommitProperties = {
+ kCommitNumRetries.key(), kCommitMinRetryWaitMs.key(),
kCommitMaxRetryWaitMs.key(),
+ kCommitTotalRetryTimeMs.key()};
+ return kCommitProperties;
+}
+
TableProperties TableProperties::default_properties() { return {}; }
TableProperties TableProperties::FromMap(
diff --git a/src/iceberg/table_properties.h b/src/iceberg/table_properties.h
index debe61da..feb4a200 100644
--- a/src/iceberg/table_properties.h
+++ b/src/iceberg/table_properties.h
@@ -286,6 +286,9 @@ class ICEBERG_EXPORT TableProperties : public
ConfigBase<TableProperties> {
/// \return The set of reserved property keys
static const std::unordered_set<std::string>& reserved_properties();
+ /// \brief Get the set of commit table property keys.
+ static const std::unordered_set<std::string>& commit_properties();
+
/// \brief Create a default TableProperties instance.
///
/// \return A unique pointer to a TableProperties instance with default
values
diff --git a/src/iceberg/table_requirement.h b/src/iceberg/table_requirement.h
index eb818106..82779aec 100644
--- a/src/iceberg/table_requirement.h
+++ b/src/iceberg/table_requirement.h
@@ -43,14 +43,14 @@ namespace iceberg {
class ICEBERG_EXPORT TableRequirement {
public:
enum class Kind : uint8_t {
- AssertDoesNotExist,
- AssertUUID,
- AssertRefSnapshotID,
- AssertLastAssignedFieldId,
- AssertCurrentSchemaID,
- AssertLastAssignedPartitionId,
- AssertDefaultSpecID,
- AssertDefaultSortOrderID,
+ kAssertDoesNotExist,
+ kAssertUUID,
+ kAssertRefSnapshotID,
+ kAssertLastAssignedFieldId,
+ kAssertCurrentSchemaID,
+ kAssertLastAssignedPartitionId,
+ kAssertDefaultSpecID,
+ kAssertDefaultSortOrderID,
};
virtual ~TableRequirement() = default;
@@ -75,7 +75,7 @@ class ICEBERG_EXPORT AssertDoesNotExist : public
TableRequirement {
public:
AssertDoesNotExist() = default;
- Kind kind() const override { return Kind::AssertDoesNotExist; }
+ Kind kind() const override { return Kind::kAssertDoesNotExist; }
Status Validate(const TableMetadata* base) const override;
};
@@ -90,7 +90,7 @@ class ICEBERG_EXPORT AssertUUID : public TableRequirement {
const std::string& uuid() const { return uuid_; }
- Kind kind() const override { return Kind::AssertUUID; }
+ Kind kind() const override { return Kind::kAssertUUID; }
Status Validate(const TableMetadata* base) const override;
@@ -112,7 +112,7 @@ class ICEBERG_EXPORT AssertRefSnapshotID : public
TableRequirement {
const std::optional<int64_t>& snapshot_id() const { return snapshot_id_; }
- Kind kind() const override { return Kind::AssertRefSnapshotID; }
+ Kind kind() const override { return Kind::kAssertRefSnapshotID; }
Status Validate(const TableMetadata* base) const override;
@@ -132,7 +132,7 @@ class ICEBERG_EXPORT AssertLastAssignedFieldId : public
TableRequirement {
int32_t last_assigned_field_id() const { return last_assigned_field_id_; }
- Kind kind() const override { return Kind::AssertLastAssignedFieldId; }
+ Kind kind() const override { return Kind::kAssertLastAssignedFieldId; }
Status Validate(const TableMetadata* base) const override;
@@ -150,7 +150,7 @@ class ICEBERG_EXPORT AssertCurrentSchemaID : public
TableRequirement {
int32_t schema_id() const { return schema_id_; }
- Kind kind() const override { return Kind::AssertCurrentSchemaID; }
+ Kind kind() const override { return Kind::kAssertCurrentSchemaID; }
Status Validate(const TableMetadata* base) const override;
@@ -169,7 +169,7 @@ class ICEBERG_EXPORT AssertLastAssignedPartitionId : public
TableRequirement {
int32_t last_assigned_partition_id() const { return
last_assigned_partition_id_; }
- Kind kind() const override { return Kind::AssertLastAssignedPartitionId; }
+ Kind kind() const override { return Kind::kAssertLastAssignedPartitionId; }
Status Validate(const TableMetadata* base) const override;
@@ -187,7 +187,7 @@ class ICEBERG_EXPORT AssertDefaultSpecID : public
TableRequirement {
int32_t spec_id() const { return spec_id_; }
- Kind kind() const override { return Kind::AssertDefaultSpecID; }
+ Kind kind() const override { return Kind::kAssertDefaultSpecID; }
Status Validate(const TableMetadata* base) const override;
@@ -206,7 +206,7 @@ class ICEBERG_EXPORT AssertDefaultSortOrderID : public
TableRequirement {
int32_t sort_order_id() const { return sort_order_id_; }
- Kind kind() const override { return Kind::AssertDefaultSortOrderID; }
+ Kind kind() const override { return Kind::kAssertDefaultSortOrderID; }
Status Validate(const TableMetadata* base) const override;
diff --git a/src/iceberg/table_requirements.cc
b/src/iceberg/table_requirements.cc
index 6de6c59e..8222ded3 100644
--- a/src/iceberg/table_requirements.cc
+++ b/src/iceberg/table_requirements.cc
@@ -19,7 +19,9 @@
#include "iceberg/table_requirements.h"
+#include <algorithm>
#include <memory>
+#include <ranges>
#include "iceberg/snapshot.h"
#include "iceberg/table_metadata.h"
@@ -134,4 +136,19 @@ Result<std::vector<std::unique_ptr<TableRequirement>>>
TableRequirements::ForUpd
return context.Build();
}
+Result<bool> TableRequirements::IsCreate(
+ const std::vector<std::unique_ptr<TableRequirement>>& requirements) {
+ bool is_create = std::ranges::any_of(requirements, [](const auto& req) {
+ return req->kind() == TableRequirement::Kind::kAssertDoesNotExist;
+ });
+
+ if (is_create) {
+ ICEBERG_PRECHECK(
+ requirements.size() == 1,
+ "Cannot have other requirements than AssertDoesNotExist in a table
creation");
+ }
+
+ return is_create;
+}
+
} // namespace iceberg
diff --git a/src/iceberg/table_requirements.h b/src/iceberg/table_requirements.h
index f79f0bea..6deefdf3 100644
--- a/src/iceberg/table_requirements.h
+++ b/src/iceberg/table_requirements.h
@@ -144,6 +144,10 @@ class ICEBERG_EXPORT TableRequirements {
static Result<std::vector<std::unique_ptr<TableRequirement>>> ForUpdateTable(
const TableMetadata& base,
const std::vector<std::unique_ptr<TableUpdate>>& table_updates);
+
+ /// \brief Check if the requirements are for table creation
+ static Result<bool> IsCreate(
+ const std::vector<std::unique_ptr<TableRequirement>>& requirements);
};
} // namespace iceberg
diff --git a/src/iceberg/table_update.cc b/src/iceberg/table_update.cc
index 9d92a16e..91578977 100644
--- a/src/iceberg/table_update.cc
+++ b/src/iceberg/table_update.cc
@@ -201,7 +201,7 @@ void
RemoveProperties::GenerateRequirements(TableUpdateContext& context) const {
// SetLocation
void SetLocation::ApplyTo(TableMetadataBuilder& builder) const {
- throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+ builder.SetLocation(location_);
}
void SetLocation::GenerateRequirements(TableUpdateContext& context) const {
diff --git a/src/iceberg/test/formatter_test.cc
b/src/iceberg/test/formatter_test.cc
index 767020b2..8878bb7c 100644
--- a/src/iceberg/test/formatter_test.cc
+++ b/src/iceberg/test/formatter_test.cc
@@ -29,6 +29,7 @@
#include <gtest/gtest.h>
#include "iceberg/statistics_file.h"
+#include "iceberg/table_identifier.h"
#include "iceberg/util/formatter_internal.h"
namespace iceberg {
@@ -160,4 +161,19 @@ TEST(FormatterTest, StatisticsFileFormat) {
EXPECT_EQ(expected, std::format("{}", statistics_file));
}
+// For Types that has a ToString function
+TEST(FormatterTest, TableIdentifierFormat) {
+ TableIdentifier empty_ns_table{
+ .ns = Namespace({}),
+ .name = "table_name",
+ };
+ EXPECT_EQ("table_name", std::format("{}", empty_ns_table));
+
+ TableIdentifier table{
+ .ns = Namespace({"ns1", "ns2"}),
+ .name = "table_name",
+ };
+ EXPECT_EQ("ns1.ns2.table_name", std::format("{}", table));
+}
+
} // namespace iceberg
diff --git a/src/iceberg/test/in_memory_catalog_test.cc
b/src/iceberg/test/in_memory_catalog_test.cc
index 194d6da5..78f67dda 100644
--- a/src/iceberg/test/in_memory_catalog_test.cc
+++ b/src/iceberg/test/in_memory_catalog_test.cc
@@ -28,7 +28,9 @@
#include <gtest/gtest.h>
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/partition_spec.h"
#include "iceberg/schema.h"
+#include "iceberg/sort_order.h"
#include "iceberg/table.h"
#include "iceberg/table_identifier.h"
#include "iceberg/table_metadata.h"
@@ -38,6 +40,8 @@
#include "iceberg/test/mock_catalog.h"
#include "iceberg/test/mock_io.h"
#include "iceberg/test/test_resource.h"
+#include "iceberg/transaction.h"
+#include "iceberg/update/update_properties.h"
#include "iceberg/util/uuid.h"
namespace iceberg {
@@ -106,6 +110,26 @@ TEST_F(InMemoryCatalogTest, TableExists) {
EXPECT_THAT(result, HasValue(::testing::Eq(false)));
}
+TEST_F(InMemoryCatalogTest, CreateTable) {
+ TableIdentifier table_ident{.ns = {}, .name = "t1"};
+ auto schema = std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "x", int64())},
+ /*schema_id=*/1);
+ auto spec = PartitionSpec::Unpartitioned();
+ auto sort_order = SortOrder::Unsorted();
+ auto metadata_location = GenerateTestTableLocation(table_ident.name);
+
+ // Create table successfully
+ auto table = catalog_->CreateTable(table_ident, schema, spec, sort_order,
+ metadata_location, {{"property1",
"value1"}});
+ EXPECT_THAT(table, IsOk());
+
+ // Create table already exists
+ auto table2 = catalog_->CreateTable(table_ident, schema, spec, sort_order,
+ metadata_location, {{"property1",
"value1"}});
+ EXPECT_THAT(table2, IsError(ErrorKind::kAlreadyExists));
+}
+
TEST_F(InMemoryCatalogTest, RegisterTable) {
TableIdentifier tableIdent{.ns = {}, .name = "t1"};
@@ -231,6 +255,50 @@ TEST_F(InMemoryCatalogTest, UpdateTable) {
testing::HasSubstr("metadata/00002-"));
}
+TEST_F(InMemoryCatalogTest, StageCreateTable) {
+ TableIdentifier table_ident{.ns = {}, .name = "t1"};
+ auto schema = std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "x", int64())},
+ /*schema_id=*/1);
+ auto spec = PartitionSpec::Unpartitioned();
+ auto sort_order = SortOrder::Unsorted();
+
+ // Stage table
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto staged_table,
+ catalog_->StageCreateTable(table_ident, schema, spec, sort_order,
+ GenerateTestTableLocation(table_ident.name),
{}));
+
+ // Perform the update
+ ICEBERG_UNWRAP_OR_FAIL(auto update_properties,
staged_table->NewUpdateProperties());
+ EXPECT_THAT(update_properties->Set("property1", "value1").Commit(), IsOk());
+ auto res1 = staged_table->Commit();
+ EXPECT_THAT(res1, IsOk());
+ auto created_table = res1.value();
+ EXPECT_EQ("t1", created_table->name().name);
+ EXPECT_EQ("value1", created_table->metadata()->properties.Get(
+ TableProperties::Entry<std::string>("property1",
"")));
+
+ // Staged already exist table
+ auto res = catalog_->StageCreateTable(table_ident, schema, spec, sort_order,
+
GenerateTestTableLocation(table_ident.name), {});
+ EXPECT_THAT(res, IsError(ErrorKind::kAlreadyExists));
+
+ // Stage create ok but commit already exist
+ TableIdentifier table_ident2{.ns = {}, .name = "t2"};
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto staged_table2,
+ catalog_->StageCreateTable(table_ident2, schema, spec, sort_order,
+ GenerateTestTableLocation(table_ident2.name),
{}));
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto created_table2,
+ catalog_->CreateTable(table_ident2, schema, spec, sort_order,
+ GenerateTestTableLocation(table_ident2.name), {}));
+
+ auto commit_res = staged_table2->Commit();
+ EXPECT_THAT(commit_res, IsError(ErrorKind::kAlreadyExists));
+}
+
TEST_F(InMemoryCatalogTest, DropTable) {
TableIdentifier tableIdent{.ns = {}, .name = "t1"};
auto result = catalog_->DropTable(tableIdent, false);
diff --git a/src/iceberg/test/table_metadata_builder_test.cc
b/src/iceberg/test/table_metadata_builder_test.cc
index 4c551fea..77380e89 100644
--- a/src/iceberg/test/table_metadata_builder_test.cc
+++ b/src/iceberg/test/table_metadata_builder_test.cc
@@ -49,6 +49,24 @@ std::shared_ptr<Schema> CreateTestSchema() {
return std::make_shared<Schema>(std::vector<SchemaField>{field1, field2,
field3}, 0);
}
+// Helper function to create a simple schema with invalid identifier fields
+std::shared_ptr<Schema> CreateInvalidSchema() {
+ auto field1 = SchemaField::MakeRequired(2, "id", int32());
+ auto field2 = SchemaField::MakeRequired(5, "part_col", string());
+ auto field3 = SchemaField::MakeRequired(8, "sort_col", timestamp());
+ return std::make_shared<Schema>(std::vector<SchemaField>{field1, field2,
field3},
+ std::make_optional(1),
std::vector<int32_t>{1});
+}
+
+// Helper function to create a simple schema with disordered field_ids
+std::shared_ptr<Schema> CreateDisorderedSchema() {
+ auto field1 = SchemaField::MakeRequired(2, "id", int32());
+ auto field2 = SchemaField::MakeRequired(5, "part_col", string());
+ auto field3 = SchemaField::MakeRequired(8, "sort_col", timestamp());
+ return std::make_shared<Schema>(std::vector<SchemaField>{field1, field2,
field3},
+ std::make_optional(1),
std::vector<int32_t>{2});
+}
+
// Helper function to create base metadata for tests
std::unique_ptr<TableMetadata> CreateBaseMetadata() {
auto metadata = std::make_unique<TableMetadata>();
@@ -73,6 +91,131 @@ std::unique_ptr<TableMetadata> CreateBaseMetadata() {
} // namespace
+// test for TableMetadata
+TEST(TableMetadataTest, Make) {
+ auto Schema = CreateDisorderedSchema();
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto spec, PartitionSpec::Make(1,
std::vector<PartitionField>{PartitionField(
+ 5, 1, "part_name",
Transform::Identity())}));
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto order, SortOrder::Make(1, std::vector<SortField>{SortField(
+ 8, Transform::Identity(),
+ SortDirection::kAscending,
NullOrder::kLast)}));
+
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto metadata, TableMetadata::Make(*Schema, *spec, *order,
"s3://bucket/test", {}));
+ // Check schema fields
+ ASSERT_EQ(1, metadata->schemas.size());
+ auto fields = metadata->schemas[0]->fields() |
std::ranges::to<std::vector>();
+ ASSERT_EQ(3, fields.size());
+ EXPECT_EQ(1, fields[0].field_id());
+ EXPECT_EQ("id", fields[0].name());
+ EXPECT_EQ(2, fields[1].field_id());
+ EXPECT_EQ("part_col", fields[1].name());
+ EXPECT_EQ(3, fields[2].field_id());
+ EXPECT_EQ("sort_col", fields[2].name());
+ const auto& identifier_field_ids =
metadata->schemas[0]->IdentifierFieldIds();
+ ASSERT_EQ(1, identifier_field_ids.size());
+ EXPECT_EQ(1, identifier_field_ids[0]);
+
+ // Check partition spec
+ ASSERT_EQ(1, metadata->partition_specs.size());
+ EXPECT_EQ(PartitionSpec::kInitialSpecId,
metadata->partition_specs[0]->spec_id());
+ auto spec_fields =
+ metadata->partition_specs[0]->fields() | std::ranges::to<std::vector>();
+ ASSERT_EQ(1, spec_fields.size());
+ EXPECT_EQ(PartitionSpec::kInvalidPartitionFieldId + 1,
spec_fields[0].field_id());
+ EXPECT_EQ(2, spec_fields[0].source_id());
+ EXPECT_EQ("part_name", spec_fields[0].name());
+
+ // Check sort order
+ ASSERT_EQ(1, metadata->sort_orders.size());
+ EXPECT_EQ(SortOrder::kInitialSortOrderId,
metadata->sort_orders[0]->order_id());
+ auto order_fields = metadata->sort_orders[0]->fields() |
std::ranges::to<std::vector>();
+ ASSERT_EQ(1, order_fields.size());
+ EXPECT_EQ(3, order_fields[0].source_id());
+ EXPECT_EQ(SortDirection::kAscending, order_fields[0].direction());
+ EXPECT_EQ(NullOrder::kLast, order_fields[0].null_order());
+}
+
+TEST(TableMetadataTest, MakeWithInvalidSchema) {
+ auto schema = CreateInvalidSchema();
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto spec, PartitionSpec::Make(1,
std::vector<PartitionField>{PartitionField(
+ 5, 1, "part_name",
Transform::Identity())}));
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto order, SortOrder::Make(1, std::vector<SortField>{SortField(
+ 5, Transform::Identity(),
+ SortDirection::kAscending,
NullOrder::kLast)}));
+
+ auto res = TableMetadata::Make(*schema, *spec, *order, "s3://bucket/test",
{});
+ EXPECT_THAT(res, IsError(ErrorKind::kInvalidSchema));
+ EXPECT_THAT(res, HasErrorMessage("Cannot find identifier field id"));
+}
+
+TEST(TableMetadataTest, MakeWithInvalidPartitionSpec) {
+ auto schema = CreateDisorderedSchema();
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto spec, PartitionSpec::Make(1,
std::vector<PartitionField>{PartitionField(
+ 6, 1, "part_name",
Transform::Identity())}));
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto order, SortOrder::Make(1, std::vector<SortField>{SortField(
+ 8, Transform::Identity(),
+ SortDirection::kAscending,
NullOrder::kLast)}));
+
+ auto res = TableMetadata::Make(*schema, *spec, *order, "s3://bucket/test",
{});
+ EXPECT_THAT(res, IsError(ErrorKind::kInvalidSchema));
+ EXPECT_THAT(res, HasErrorMessage("Cannot find source partition field"));
+}
+
+TEST(TableMetadataTest, MakeWithInvalidSortOrder) {
+ auto schema = CreateDisorderedSchema();
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto spec, PartitionSpec::Make(1,
std::vector<PartitionField>{PartitionField(
+ 5, 1, "part_name",
Transform::Identity())}));
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto order, SortOrder::Make(1, std::vector<SortField>{SortField(
+ 9, Transform::Identity(),
+ SortDirection::kAscending,
NullOrder::kLast)}));
+
+ auto res = TableMetadata::Make(*schema, *spec, *order, "s3://bucket/test",
{});
+ EXPECT_THAT(res, IsError(ErrorKind::kInvalidSchema));
+ EXPECT_THAT(res, HasErrorMessage("Cannot find source sort field"));
+}
+
+TEST(TableMetadataTest, InvalidProperties) {
+ auto spec = PartitionSpec::Unpartitioned();
+ auto order = SortOrder::Unsorted();
+
+ {
+ // Invalid metrics config
+ auto schema = CreateDisorderedSchema();
+ std::unordered_map<std::string, std::string> invlaid_metric_config = {
+ {std::string(TableProperties::kMetricModeColumnConfPrefix) +
"unknown_col",
+ "value"}};
+
+ auto res = TableMetadata::Make(*schema, *spec, *order, "s3://bucket/test",
+ invlaid_metric_config);
+ EXPECT_THAT(res, IsError(ErrorKind::kValidationFailed));
+ EXPECT_THAT(res, HasErrorMessage("Invalid metrics config"));
+ }
+
+ {
+ // Invaid commit properties
+ auto schema = CreateDisorderedSchema();
+ std::unordered_map<std::string, std::string> invlaid_commit_properties = {
+ {TableProperties::kCommitNumRetries.key(), "-1"}};
+
+ auto res = TableMetadata::Make(*schema, *spec, *order, "s3://bucket/test",
+ invlaid_commit_properties);
+ EXPECT_THAT(res, IsError(ErrorKind::kValidationFailed));
+ EXPECT_THAT(res,
+ HasErrorMessage(std::format(
+ "Table property {} must have non negative integer value,
but got {}",
+ TableProperties::kCommitNumRetries.key(), -1)));
+ }
+}
+
// test construction of TableMetadataBuilder
TEST(TableMetadataBuilderTest, BuildFromEmpty) {
auto builder = TableMetadataBuilder::BuildFromEmpty(2);
diff --git a/src/iceberg/test/table_requirement_test.cc
b/src/iceberg/test/table_requirement_test.cc
index 8b67561f..07055f18 100644
--- a/src/iceberg/test/table_requirement_test.cc
+++ b/src/iceberg/test/table_requirement_test.cc
@@ -36,7 +36,7 @@ TEST(TableRequirementTest, AssertUUID) {
// Success - UUID matches
table::AssertUUID requirement("test-uuid-1234");
- EXPECT_EQ(TableRequirement::Kind::AssertUUID, requirement.kind());
+ EXPECT_EQ(TableRequirement::Kind::kAssertUUID, requirement.kind());
ASSERT_THAT(requirement.Validate(base.get()), IsOk());
// UUID mismatch
@@ -63,7 +63,7 @@ TEST(TableRequirementTest, AssertCurrentSchemaID) {
// Success - schema ID matches
table::AssertCurrentSchemaID requirement(5);
- EXPECT_EQ(TableRequirement::Kind::AssertCurrentSchemaID, requirement.kind());
+ EXPECT_EQ(TableRequirement::Kind::kAssertCurrentSchemaID,
requirement.kind());
ASSERT_THAT(requirement.Validate(base.get()), IsOk());
// Schema ID mismatch
@@ -89,7 +89,7 @@ TEST(TableRequirementTest, AssertCurrentSchemaID) {
TEST(TableRequirementTest, AssertDoesNotExist) {
// Success - table does not exist (null metadata)
table::AssertDoesNotExist requirement;
- EXPECT_EQ(TableRequirement::Kind::AssertDoesNotExist, requirement.kind());
+ EXPECT_EQ(TableRequirement::Kind::kAssertDoesNotExist, requirement.kind());
ASSERT_THAT(requirement.Validate(nullptr), IsOk());
// Table already exists
@@ -108,7 +108,7 @@ TEST(TableRequirementTest, AssertRefSnapshotID) {
// Success - ref snapshot ID matches
table::AssertRefSnapshotID requirement("main", 100);
- EXPECT_EQ(TableRequirement::Kind::AssertRefSnapshotID, requirement.kind());
+ EXPECT_EQ(TableRequirement::Kind::kAssertRefSnapshotID, requirement.kind());
ASSERT_THAT(requirement.Validate(base.get()), IsOk());
// Snapshot ID mismatch
@@ -140,7 +140,7 @@ TEST(TableRequirementTest, AssertLastAssignedFieldId) {
// Success - field ID matches
table::AssertLastAssignedFieldId requirement(10);
- EXPECT_EQ(TableRequirement::Kind::AssertLastAssignedFieldId,
requirement.kind());
+ EXPECT_EQ(TableRequirement::Kind::kAssertLastAssignedFieldId,
requirement.kind());
ASSERT_THAT(requirement.Validate(base.get()), IsOk());
// Field ID mismatch
@@ -160,7 +160,7 @@ TEST(TableRequirementTest, AssertLastAssignedPartitionId) {
// Success - partition ID matches
table::AssertLastAssignedPartitionId requirement(5);
- EXPECT_EQ(TableRequirement::Kind::AssertLastAssignedPartitionId,
requirement.kind());
+ EXPECT_EQ(TableRequirement::Kind::kAssertLastAssignedPartitionId,
requirement.kind());
ASSERT_THAT(requirement.Validate(base.get()), IsOk());
// Partition ID mismatch
@@ -182,7 +182,7 @@ TEST(TableRequirementTest, AssertDefaultSpecID) {
// Success - spec ID matches
table::AssertDefaultSpecID requirement(3);
- EXPECT_EQ(TableRequirement::Kind::AssertDefaultSpecID, requirement.kind());
+ EXPECT_EQ(TableRequirement::Kind::kAssertDefaultSpecID, requirement.kind());
ASSERT_THAT(requirement.Validate(base.get()), IsOk());
// Spec ID mismatch
@@ -198,7 +198,7 @@ TEST(TableRequirementTest, AssertDefaultSortOrderID) {
// Success - sort order ID matches
table::AssertDefaultSortOrderID requirement(2);
- EXPECT_EQ(TableRequirement::Kind::AssertDefaultSortOrderID,
requirement.kind());
+ EXPECT_EQ(TableRequirement::Kind::kAssertDefaultSortOrderID,
requirement.kind());
ASSERT_THAT(requirement.Validate(base.get()), IsOk());
// Sort order ID mismatch
diff --git a/src/iceberg/test/table_requirements_test.cc
b/src/iceberg/test/table_requirements_test.cc
index bbbcc681..c05e505f 100644
--- a/src/iceberg/test/table_requirements_test.cc
+++ b/src/iceberg/test/table_requirements_test.cc
@@ -102,6 +102,20 @@ TEST(TableRequirementsTest, EmptyUpdatesForCreateTable) {
EXPECT_NE(assert_does_not_exist, nullptr);
}
+TEST(TableRequirementsTest, IsCreate) {
+ // Should have only AssertDoesNotExist requirement
+ std::vector<std::unique_ptr<TableRequirement>> requirements;
+ requirements.push_back(std::make_unique<table::AssertDoesNotExist>());
+ EXPECT_TRUE(TableRequirements::IsCreate(requirements));
+
+ // Not only have AssertDoesNotExist requirement
+ requirements.push_back(std::make_unique<table::AssertCurrentSchemaID>(0));
+ auto res = TableRequirements::IsCreate(requirements);
+ EXPECT_THAT(res, IsError(ErrorKind::kInvalidArgument));
+ EXPECT_THAT(res,
+ HasErrorMessage("Cannot have other requirements than
AssertDoesNotExist"));
+}
+
TEST(TableRequirementsTest, EmptyUpdatesForUpdateTable) {
auto metadata = CreateBaseMetadata();
std::vector<std::unique_ptr<TableUpdate>> updates;
diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc
index 9404fe2e..33864ca9 100644
--- a/src/iceberg/transaction.cc
+++ b/src/iceberg/transaction.cc
@@ -36,11 +36,12 @@
namespace iceberg {
-Transaction::Transaction(std::shared_ptr<Table> table, Kind kind, bool
auto_commit)
+Transaction::Transaction(std::shared_ptr<Table> table, Kind kind, bool
auto_commit,
+ std::unique_ptr<TableMetadataBuilder>
metadata_builder)
: table_(std::move(table)),
kind_(kind),
auto_commit_(auto_commit),
-
metadata_builder_(TableMetadataBuilder::BuildFrom(table_->metadata().get())) {}
+ metadata_builder_(std::move(metadata_builder)) {}
Transaction::~Transaction() = default;
@@ -49,8 +50,17 @@ Result<std::shared_ptr<Transaction>>
Transaction::Make(std::shared_ptr<Table> ta
if (!table || !table->catalog()) [[unlikely]] {
return InvalidArgument("Table and catalog cannot be null");
}
+
+ std::unique_ptr<TableMetadataBuilder> metadata_builder;
+ if (kind == Kind::kCreate) {
+ metadata_builder = TableMetadataBuilder::BuildFromEmpty();
+ std::ignore = metadata_builder->ApplyChangesForCreate(*table->metadata());
+ } else {
+ metadata_builder =
TableMetadataBuilder::BuildFrom(table->metadata().get());
+ }
+
return std::shared_ptr<Transaction>(
- new Transaction(std::move(table), kind, auto_commit));
+ new Transaction(std::move(table), kind, auto_commit,
std::move(metadata_builder)));
}
const TableMetadata* Transaction::base() const { return
metadata_builder_->base(); }
diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h
index 18143b8a..87a2139b 100644
--- a/src/iceberg/transaction.h
+++ b/src/iceberg/transaction.h
@@ -69,7 +69,8 @@ class ICEBERG_EXPORT Transaction : public
std::enable_shared_from_this<Transacti
Result<std::shared_ptr<UpdateSortOrder>> NewUpdateSortOrder();
private:
- Transaction(std::shared_ptr<Table> table, Kind kind, bool auto_commit);
+ Transaction(std::shared_ptr<Table> table, Kind kind, bool auto_commit,
+ std::unique_ptr<TableMetadataBuilder> metadata_builder);
Status AddUpdate(const std::shared_ptr<PendingUpdate>& update);
diff --git a/src/iceberg/util/meson.build b/src/iceberg/util/meson.build
index 48477925..880f6340 100644
--- a/src/iceberg/util/meson.build
+++ b/src/iceberg/util/meson.build
@@ -32,6 +32,7 @@ install_headers(
'location_util.h',
'macros.h',
'partition_value_util.h',
+ 'property_util.h',
'string_util.h',
'temporal_util.h',
'timepoint.h',
diff --git a/src/iceberg/util/property_util.cc
b/src/iceberg/util/property_util.cc
new file mode 100644
index 00000000..636083fd
--- /dev/null
+++ b/src/iceberg/util/property_util.cc
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/util/property_util.h"
+
+#include <charconv>
+
+#include "iceberg/table_properties.h"
+
+namespace iceberg {
+
+Status PropertyUtil::ValidateCommitProperties(
+ const std::unordered_map<std::string, std::string>& properties) {
+ for (const auto& property : TableProperties::commit_properties()) {
+ if (auto it = properties.find(property); it != properties.end()) {
+ int32_t parsed;
+ auto [ptr, ec] = std::from_chars(it->second.data(),
+ it->second.data() + it->second.size(),
parsed);
+ if (ec == std::errc::invalid_argument) {
+ return ValidationFailed("Table property {} must have integer value,
but got {}",
+ property, it->second);
+ } else if (ec == std::errc::result_out_of_range) {
+ return ValidationFailed("Table property {} value out of range {}",
property,
+ it->second);
+ }
+ if (parsed < 0) {
+ return ValidationFailed(
+ "Table property {} must have non negative integer value, but got
{}",
+ property, parsed);
+ }
+ }
+ }
+ return {};
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/table_identifier.h b/src/iceberg/util/property_util.h
similarity index 56%
copy from src/iceberg/table_identifier.h
copy to src/iceberg/util/property_util.h
index bef9b81d..4e3e9b12 100644
--- a/src/iceberg/table_identifier.h
+++ b/src/iceberg/util/property_util.h
@@ -19,40 +19,18 @@
#pragma once
-/// \file iceberg/table_identifier.h
-/// A TableIdentifier is a unique identifier for a table
-
#include <string>
-#include <vector>
+#include <unordered_map>
#include "iceberg/iceberg_export.h"
#include "iceberg/result.h"
namespace iceberg {
-/// \brief A namespace in a catalog.
-struct ICEBERG_EXPORT Namespace {
- std::vector<std::string> levels;
-
- bool operator==(const Namespace& other) const { return levels ==
other.levels; }
-};
-
-/// \brief Identifies a table in iceberg catalog.
-struct ICEBERG_EXPORT TableIdentifier {
- Namespace ns;
- std::string name;
-
- bool operator==(const TableIdentifier& other) const {
- return ns == other.ns && name == other.name;
- }
-
- /// \brief Validates the TableIdentifier.
- Status Validate() const {
- if (name.empty()) {
- return Invalid("Invalid table identifier: missing table name");
- }
- return {};
- }
+class ICEBERG_EXPORT PropertyUtil {
+ public:
+ static Status ValidateCommitProperties(
+ const std::unordered_map<std::string, std::string>& properties);
};
} // namespace iceberg