This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new fea93b1a feat: implement create (stage) table for InMemoryCatalog 
(#416)
fea93b1a is described below

commit fea93b1ac7eeaafbc0a2e2e52f9679c65323424b
Author: wzhuo <[email protected]>
AuthorDate: Tue Dec 30 13:43:21 2025 +0800

    feat: implement create (stage) table for InMemoryCatalog (#416)
---
 src/iceberg/CMakeLists.txt                         |   2 +
 src/iceberg/catalog/memory/in_memory_catalog.cc    |  75 +++++++--
 src/iceberg/meson.build                            |   2 +
 src/iceberg/schema.cc                              |   2 +-
 .../{table_properties.cc => table_identifier.cc}   |  27 ++-
 src/iceberg/table_identifier.h                     |  11 ++
 src/iceberg/table_metadata.cc                      | 181 ++++++++++++++++++++-
 src/iceberg/table_metadata.h                       |  15 +-
 src/iceberg/table_properties.cc                    |   7 +
 src/iceberg/table_properties.h                     |   3 +
 src/iceberg/table_requirement.h                    |  32 ++--
 src/iceberg/table_requirements.cc                  |  17 ++
 src/iceberg/table_requirements.h                   |   4 +
 src/iceberg/table_update.cc                        |   2 +-
 src/iceberg/test/formatter_test.cc                 |  16 ++
 src/iceberg/test/in_memory_catalog_test.cc         |  68 ++++++++
 src/iceberg/test/table_metadata_builder_test.cc    | 143 ++++++++++++++++
 src/iceberg/test/table_requirement_test.cc         |  16 +-
 src/iceberg/test/table_requirements_test.cc        |  14 ++
 src/iceberg/transaction.cc                         |  16 +-
 src/iceberg/transaction.h                          |   3 +-
 src/iceberg/util/meson.build                       |   1 +
 src/iceberg/util/property_util.cc                  |  52 ++++++
 .../{table_identifier.h => util/property_util.h}   |  32 +---
 24 files changed, 654 insertions(+), 87 deletions(-)

diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index 27892935..ce339cfe 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -66,6 +66,7 @@ set(ICEBERG_SOURCES
     sort_order.cc
     statistics_file.cc
     table.cc
+    table_identifier.cc
     table_metadata.cc
     table_properties.cc
     table_requirement.cc
@@ -86,6 +87,7 @@ set(ICEBERG_SOURCES
     util/decimal.cc
     util/gzip_internal.cc
     util/murmurhash3_internal.cc
+    util/property_util.cc
     util/snapshot_util.cc
     util/temporal_util.cc
     util/timepoint.cc
diff --git a/src/iceberg/catalog/memory/in_memory_catalog.cc 
b/src/iceberg/catalog/memory/in_memory_catalog.cc
index b3fd0060..5e562765 100644
--- a/src/iceberg/catalog/memory/in_memory_catalog.cc
+++ b/src/iceberg/catalog/memory/in_memory_catalog.cc
@@ -26,7 +26,10 @@
 #include "iceberg/table_identifier.h"
 #include "iceberg/table_metadata.h"
 #include "iceberg/table_requirement.h"
+#include "iceberg/table_requirements.h"
 #include "iceberg/table_update.h"
+#include "iceberg/transaction.h"
+#include "iceberg/util/checked_cast.h"
 #include "iceberg/util/macros.h"
 
 namespace iceberg {
@@ -318,7 +321,7 @@ Result<std::string> 
InMemoryNamespace::GetTableMetadataLocation(
   ICEBERG_RETURN_UNEXPECTED(ns);
   const auto it = ns.value()->table_metadata_locations_.find(table_ident.name);
   if (it == ns.value()->table_metadata_locations_.end()) {
-    return NotFound("{} does not exist", table_ident.name);
+    return NotFound("Table does not exist: {}", table_ident);
   }
   return it->second;
 }
@@ -405,7 +408,23 @@ Result<std::shared_ptr<Table>> 
InMemoryCatalog::CreateTable(
     const std::string& location,
     const std::unordered_map<std::string, std::string>& properties) {
   std::unique_lock lock(mutex_);
-  return NotImplemented("create table");
+  if (root_namespace_->TableExists(identifier).value_or(false)) {
+    return AlreadyExists("Table already exists: {}", identifier);
+  }
+
+  std::string base_location =
+      location.empty() ? warehouse_location_ + "/" + identifier.ToString() : 
location;
+
+  ICEBERG_ASSIGN_OR_RAISE(auto table_metadata, TableMetadata::Make(*schema, 
*spec, *order,
+                                                                   location, 
properties));
+
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto metadata_file_location,
+      TableMetadataUtil::Write(*file_io_, nullptr, "", *table_metadata));
+  ICEBERG_RETURN_UNEXPECTED(
+      root_namespace_->UpdateTableMetadataLocation(identifier, 
metadata_file_location));
+  return Table::Make(identifier, std::move(table_metadata),
+                     std::move(metadata_file_location), file_io_, 
shared_from_this());
 }
 
 Result<std::shared_ptr<Table>> InMemoryCatalog::UpdateTable(
@@ -413,24 +432,44 @@ Result<std::shared_ptr<Table>> 
InMemoryCatalog::UpdateTable(
     const std::vector<std::unique_ptr<TableRequirement>>& requirements,
     const std::vector<std::unique_ptr<TableUpdate>>& updates) {
   std::unique_lock lock(mutex_);
-  ICEBERG_ASSIGN_OR_RAISE(auto base_metadata_location,
-                          
root_namespace_->GetTableMetadataLocation(identifier));
-
-  ICEBERG_ASSIGN_OR_RAISE(auto base,
-                          TableMetadataUtil::Read(*file_io_, 
base_metadata_location));
+  auto base_metadata_location = 
root_namespace_->GetTableMetadataLocation(identifier);
+  std::unique_ptr<TableMetadata> base;
+  std::unique_ptr<TableMetadataBuilder> builder;
+  ICEBERG_ASSIGN_OR_RAISE(auto is_create, 
TableRequirements::IsCreate(requirements));
+  if (is_create) {
+    if (base_metadata_location.has_value()) {
+      return AlreadyExists("Table already exists: {}", identifier);
+    }
+    int8_t format_version = TableMetadata::kDefaultTableFormatVersion;
+    for (const auto& update : updates) {
+      if (update->kind() == TableUpdate::Kind::kUpgradeFormatVersion) {
+        format_version =
+            internal::checked_cast<const table::UpgradeFormatVersion&>(*update)
+                .format_version();
+      }
+    }
+    builder = TableMetadataBuilder::BuildFromEmpty(format_version);
+  } else {
+    ICEBERG_RETURN_UNEXPECTED(base_metadata_location);
+    ICEBERG_ASSIGN_OR_RAISE(
+        base, TableMetadataUtil::Read(*file_io_, 
base_metadata_location.value()));
+    builder = TableMetadataBuilder::BuildFrom(base.get());
+  }
 
   for (const auto& requirement : requirements) {
     ICEBERG_RETURN_UNEXPECTED(requirement->Validate(base.get()));
   }
 
-  auto builder = TableMetadataBuilder::BuildFrom(base.get());
   for (const auto& update : updates) {
     update->ApplyTo(*builder);
   }
   ICEBERG_ASSIGN_OR_RAISE(auto updated, builder->Build());
   ICEBERG_ASSIGN_OR_RAISE(
       auto new_metadata_location,
-      TableMetadataUtil::Write(*file_io_, base.get(), base_metadata_location, 
*updated));
+      TableMetadataUtil::Write(
+          *file_io_, base.get(),
+          base_metadata_location.has_value() ? base_metadata_location.value() 
: "",
+          *updated));
   ICEBERG_RETURN_UNEXPECTED(
       root_namespace_->UpdateTableMetadataLocation(identifier, 
new_metadata_location));
   TableMetadataUtil::DeleteRemovedMetadataFiles(*file_io_, base.get(), 
*updated);
@@ -445,7 +484,21 @@ Result<std::shared_ptr<Transaction>> 
InMemoryCatalog::StageCreateTable(
     const std::string& location,
     const std::unordered_map<std::string, std::string>& properties) {
   std::unique_lock lock(mutex_);
-  return NotImplemented("stage create table");
+  if (root_namespace_->TableExists(identifier).value_or(false)) {
+    return AlreadyExists("Table already exists: {}", identifier);
+  }
+
+  std::string base_location =
+      location.empty() ? warehouse_location_ + "/" + identifier.ToString() : 
location;
+
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto table_metadata,
+      TableMetadata::Make(*schema, *spec, *order, base_location, properties));
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto table, StagedTable::Make(identifier, std::move(table_metadata), "", 
file_io_,
+                                    shared_from_this()));
+  return Transaction::Make(std::move(table), Transaction::Kind::kCreate,
+                           /* auto_commit */ false);
 }
 
 Result<bool> InMemoryCatalog::TableExists(const TableIdentifier& identifier) 
const {
@@ -495,7 +548,7 @@ Result<std::shared_ptr<Table>> 
InMemoryCatalog::RegisterTable(
 
   std::unique_lock lock(mutex_);
   if (!root_namespace_->NamespaceExists(identifier.ns)) {
-    return NoSuchNamespace("table namespace does not exist.");
+    return NoSuchNamespace("Table namespace does not exist: {}", 
identifier.ns);
   }
   if (!root_namespace_->RegisterTable(identifier, metadata_file_location)) {
     return UnknownError("The registry failed.");
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index f8022b09..77d72638 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -88,6 +88,7 @@ iceberg_sources = files(
     'sort_order.cc',
     'statistics_file.cc',
     'table.cc',
+    'table_identifier.cc',
     'table_metadata.cc',
     'table_properties.cc',
     'table_requirement.cc',
@@ -108,6 +109,7 @@ iceberg_sources = files(
     'util/decimal.cc',
     'util/gzip_internal.cc',
     'util/murmurhash3_internal.cc',
+    'util/property_util.cc',
     'util/snapshot_util.cc',
     'util/temporal_util.cc',
     'util/timepoint.cc',
diff --git a/src/iceberg/schema.cc b/src/iceberg/schema.cc
index 0b7336b6..afe6be1e 100644
--- a/src/iceberg/schema.cc
+++ b/src/iceberg/schema.cc
@@ -235,7 +235,7 @@ Result<std::vector<std::string>> 
Schema::IdentifierFieldNames() const {
   for (auto id : identifier_field_ids_) {
     ICEBERG_ASSIGN_OR_RAISE(auto name, FindColumnNameById(id));
     if (!name.has_value()) {
-      return InvalidSchema("Cannot find the field of the specified field id: 
{}", id);
+      return InvalidSchema("Cannot find identifier field id: {}", id);
     }
     names.emplace_back(name.value());
   }
diff --git a/src/iceberg/table_properties.cc b/src/iceberg/table_identifier.cc
similarity index 52%
copy from src/iceberg/table_properties.cc
copy to src/iceberg/table_identifier.cc
index 96633bc2..9b92d549 100644
--- a/src/iceberg/table_properties.cc
+++ b/src/iceberg/table_identifier.cc
@@ -17,27 +17,20 @@
  * under the License.
  */
 
-#include "iceberg/table_properties.h"
+#include "iceberg/table_identifier.h"
 
-namespace iceberg {
+#include "iceberg/util/formatter_internal.h"
 
-const std::unordered_set<std::string>& TableProperties::reserved_properties() {
-  static const std::unordered_set<std::string> kReservedProperties = {
-      kFormatVersion.key(),          kUuid.key(),
-      kSnapshotCount.key(),          kCurrentSnapshotId.key(),
-      kCurrentSnapshotSummary.key(), kCurrentSnapshotTimestamp.key(),
-      kCurrentSchema.key(),          kDefaultPartitionSpec.key(),
-      kDefaultSortOrder.key()};
-  return kReservedProperties;
-}
+namespace iceberg {
 
-TableProperties TableProperties::default_properties() { return {}; }
+std::string Namespace::ToString() const { return FormatRange(levels, ".", "", 
""); }
 
-TableProperties TableProperties::FromMap(
-    std::unordered_map<std::string, std::string> properties) {
-  TableProperties table_properties;
-  table_properties.configs_ = std::move(properties);
-  return table_properties;
+std::string TableIdentifier::ToString() const {
+  if (!ns.levels.empty()) {
+    return std::format("{}.{}", ns.ToString(), name);
+  } else {
+    return name;
+  }
 }
 
 }  // namespace iceberg
diff --git a/src/iceberg/table_identifier.h b/src/iceberg/table_identifier.h
index bef9b81d..77a2ab9c 100644
--- a/src/iceberg/table_identifier.h
+++ b/src/iceberg/table_identifier.h
@@ -27,6 +27,7 @@
 
 #include "iceberg/iceberg_export.h"
 #include "iceberg/result.h"
+#include "iceberg/util/formatter.h"  // IWYU pragma: keep
 
 namespace iceberg {
 
@@ -35,8 +36,12 @@ struct ICEBERG_EXPORT Namespace {
   std::vector<std::string> levels;
 
   bool operator==(const Namespace& other) const { return levels == 
other.levels; }
+
+  std::string ToString() const;
 };
 
+ICEBERG_EXPORT inline std::string ToString(const Namespace& ns) { return 
ns.ToString(); }
+
 /// \brief Identifies a table in iceberg catalog.
 struct ICEBERG_EXPORT TableIdentifier {
   Namespace ns;
@@ -53,6 +58,12 @@ struct ICEBERG_EXPORT TableIdentifier {
     }
     return {};
   }
+
+  std::string ToString() const;
 };
 
+ICEBERG_EXPORT inline std::string ToString(const TableIdentifier& ident) {
+  return ident.ToString();
+}
+
 }  // namespace iceberg
diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc
index b6cb5f63..095f91d1 100644
--- a/src/iceberg/table_metadata.cc
+++ b/src/iceberg/table_metadata.cc
@@ -37,6 +37,7 @@
 #include "iceberg/exception.h"
 #include "iceberg/file_io.h"
 #include "iceberg/json_internal.h"
+#include "iceberg/metrics_config.h"
 #include "iceberg/partition_field.h"
 #include "iceberg/partition_spec.h"
 #include "iceberg/result.h"
@@ -50,12 +51,130 @@
 #include "iceberg/util/gzip_internal.h"
 #include "iceberg/util/location_util.h"
 #include "iceberg/util/macros.h"
+#include "iceberg/util/property_util.h"
+#include "iceberg/util/type_util.h"
 #include "iceberg/util/uuid.h"
+
 namespace iceberg {
 namespace {
 const TimePointMs kInvalidLastUpdatedMs = TimePointMs::min();
 constexpr int32_t kLastAdded = -1;
 constexpr std::string_view kMetadataFolderName = "metadata";
+
+// TableMetadata private static methods
+Result<std::unique_ptr<PartitionSpec>> FreshPartitionSpec(int32_t spec_id,
+                                                          const PartitionSpec& 
spec,
+                                                          const Schema& 
base_schema,
+                                                          const Schema& 
fresh_schema) {
+  std::vector<PartitionField> partition_fields;
+  partition_fields.reserve(spec.fields().size());
+  int32_t last_partition_field_id = PartitionSpec::kInvalidPartitionFieldId;
+  for (auto& field : spec.fields()) {
+    ICEBERG_ASSIGN_OR_RAISE(auto source_name,
+                            base_schema.FindColumnNameById(field.source_id()));
+    if (!source_name.has_value()) [[unlikely]] {
+      return InvalidSchema(
+          "Cannot find source partition field with ID {} in the old schema",
+          field.source_id());
+    }
+    ICEBERG_ASSIGN_OR_RAISE(auto fresh_field,
+                            fresh_schema.FindFieldByName(source_name.value()));
+    if (!fresh_field.has_value()) [[unlikely]] {
+      return InvalidSchema("Partition field {} does not exist in the schema",
+                           source_name.value());
+    }
+    partition_fields.emplace_back(fresh_field.value().get().field_id(),
+                                  ++last_partition_field_id, 
std::string(field.name()),
+                                  field.transform());
+  }
+  return PartitionSpec::Make(fresh_schema, spec_id, 
std::move(partition_fields), false);
+}
+
+Result<std::shared_ptr<SortOrder>> FreshSortOrder(int32_t order_id,
+                                                  const SortOrder& order,
+                                                  const Schema& base_schema,
+                                                  const Schema& fresh_schema) {
+  if (order.is_unsorted()) {
+    return SortOrder::Unsorted();
+  }
+
+  std::vector<SortField> sort_fields;
+  sort_fields.reserve(order.fields().size());
+  for (const auto& field : order.fields()) {
+    ICEBERG_ASSIGN_OR_RAISE(auto source_name,
+                            base_schema.FindColumnNameById(field.source_id()));
+    if (!source_name.has_value()) {
+      return InvalidSchema("Cannot find source sort field with ID {} in the 
old schema",
+                           field.source_id());
+    }
+    ICEBERG_ASSIGN_OR_RAISE(auto fresh_field,
+                            fresh_schema.FindFieldByName(source_name.value()));
+    if (!fresh_field.has_value()) {
+      return InvalidSchema("Cannot find field '{}' in the new schema",
+                           source_name.value());
+    }
+    sort_fields.emplace_back(fresh_field.value().get().field_id(), 
field.transform(),
+                             field.direction(), field.null_order());
+  }
+  return SortOrder::Make(order_id, std::move(sort_fields));
+}
+
+std::vector<std::unique_ptr<TableUpdate>> ChangesForCreate(
+    const TableMetadata& metadata) {
+  std::vector<std::unique_ptr<TableUpdate>> changes;
+
+  // Add UUID assignment
+  changes.push_back(std::make_unique<table::AssignUUID>(metadata.table_uuid));
+
+  // Add format version upgrade
+  changes.push_back(
+      std::make_unique<table::UpgradeFormatVersion>(metadata.format_version));
+
+  // Add schema
+  if (auto current_schema_result = metadata.Schema(); 
current_schema_result.has_value()) {
+    auto current_schema = current_schema_result.value();
+    changes.push_back(
+        std::make_unique<table::AddSchema>(current_schema, 
metadata.last_column_id));
+    changes.push_back(std::make_unique<table::SetCurrentSchema>(kLastAdded));
+  }
+
+  // Add partition spec
+  if (auto partition_spec_result = metadata.PartitionSpec();
+      partition_spec_result.has_value()) {
+    auto spec = partition_spec_result.value();
+    if (spec && spec->spec_id() != PartitionSpec::kInitialSpecId) {
+      changes.push_back(std::make_unique<table::AddPartitionSpec>(spec));
+    } else {
+      changes.push_back(
+          
std::make_unique<table::AddPartitionSpec>(PartitionSpec::Unpartitioned()));
+    }
+    
changes.push_back(std::make_unique<table::SetDefaultPartitionSpec>(kLastAdded));
+  }
+
+  // Add sort order
+  if (auto sort_order_result = metadata.SortOrder(); 
sort_order_result.has_value()) {
+    auto order = sort_order_result.value();
+    if (order && order->is_sorted()) {
+      changes.push_back(std::make_unique<table::AddSortOrder>(order));
+    } else {
+      
changes.push_back(std::make_unique<table::AddSortOrder>(SortOrder::Unsorted()));
+    }
+    
changes.push_back(std::make_unique<table::SetDefaultSortOrder>(kLastAdded));
+  }
+
+  // Set location if not empty
+  if (!metadata.location.empty()) {
+    changes.push_back(std::make_unique<table::SetLocation>(metadata.location));
+  }
+
+  // Set properties if not empty
+  if (!metadata.properties.configs().empty()) {
+    changes.push_back(
+        std::make_unique<table::SetProperties>(metadata.properties.configs()));
+  }
+
+  return changes;
+}
 }  // namespace
 
 std::string ToString(const SnapshotLogEntry& entry) {
@@ -68,6 +187,48 @@ std::string ToString(const MetadataLogEntry& entry) {
                      entry.metadata_file);
 }
 
+Result<std::unique_ptr<TableMetadata>> TableMetadata::Make(
+    const iceberg::Schema& schema, const iceberg::PartitionSpec& spec,
+    const iceberg::SortOrder& sort_order, const std::string& location,
+    const std::unordered_map<std::string, std::string>& properties, int 
format_version) {
+  for (const auto& [key, _] : properties) {
+    if (TableProperties::reserved_properties().contains(key)) {
+      return InvalidArgument(
+          "Table properties should not contain reserved properties, but got 
{}", key);
+    }
+  }
+
+  // Reassign all column ids to ensure consistency
+  int32_t last_column_id = 0;
+  auto next_id = [&last_column_id]() -> int32_t { return ++last_column_id; };
+  ICEBERG_ASSIGN_OR_RAISE(auto fresh_schema,
+                          AssignFreshIds(Schema::kInitialSchemaId, schema, 
next_id));
+
+  // Rebuild the partition spec using the new column ids
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto fresh_spec,
+      FreshPartitionSpec(PartitionSpec::kInitialSpecId, spec, schema, 
*fresh_schema));
+
+  // rebuild the sort order using the new column ids
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto fresh_order,
+      FreshSortOrder(SortOrder::kInitialSortOrderId, sort_order, schema, 
*fresh_schema))
+
+  // Validata the metrics configuration.
+  ICEBERG_RETURN_UNEXPECTED(
+      MetricsConfig::VerifyReferencedColumns(properties, *fresh_schema));
+
+  
ICEBERG_RETURN_UNEXPECTED(PropertyUtil::ValidateCommitProperties(properties));
+
+  return TableMetadataBuilder::BuildFromEmpty(format_version)
+      ->SetLocation(location)
+      .SetCurrentSchema(std::move(fresh_schema), last_column_id)
+      .SetDefaultPartitionSpec(std::move(fresh_spec))
+      .SetDefaultSortOrder(std::move(fresh_order))
+      .SetProperties(properties)
+      .Build();
+}
+
 Result<std::shared_ptr<Schema>> TableMetadata::Schema() const {
   return SchemaById(current_schema_id);
 }
@@ -435,6 +596,7 @@ class TableMetadataBuilder::Impl {
   Status SetCurrentSchema(int32_t schema_id);
   Status RemoveSchemas(const std::unordered_set<int32_t>& schema_ids);
   Result<int32_t> AddSchema(const Schema& schema, int32_t new_last_column_id);
+  void SetLocation(std::string_view location);
 
   Result<std::unique_ptr<TableMetadata>> Build();
 
@@ -819,6 +981,14 @@ Result<int32_t> 
TableMetadataBuilder::Impl::AddSchema(const Schema& schema,
   return new_schema_id;
 }
 
+void TableMetadataBuilder::Impl::SetLocation(std::string_view location) {
+  if (location == metadata_.location) {
+    return;
+  }
+  metadata_.location = std::string(location);
+  
changes_.push_back(std::make_unique<table::SetLocation>(std::string(location)));
+}
+
 Result<std::unique_ptr<TableMetadata>> TableMetadataBuilder::Impl::Build() {
   // 1. Validate metadata consistency through TableMetadata#Validate
 
@@ -938,6 +1108,14 @@ std::unique_ptr<TableMetadataBuilder> 
TableMetadataBuilder::BuildFrom(
   return std::unique_ptr<TableMetadataBuilder>(new 
TableMetadataBuilder(base));  // NOLINT
 }
 
+TableMetadataBuilder& TableMetadataBuilder::ApplyChangesForCreate(
+    const TableMetadata& base) {
+  for (auto& change : ChangesForCreate(base)) {
+    change->ApplyTo(*this);
+  }
+  return *this;
+}
+
 TableMetadataBuilder& TableMetadataBuilder::SetMetadataLocation(
     std::string_view metadata_location) {
   impl_->SetMetadataLocation(metadata_location);
@@ -1099,7 +1277,8 @@ TableMetadataBuilder& 
TableMetadataBuilder::RemoveProperties(
 }
 
 TableMetadataBuilder& TableMetadataBuilder::SetLocation(std::string_view 
location) {
-  throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+  impl_->SetLocation(location);
+  return *this;
 }
 
 TableMetadataBuilder& TableMetadataBuilder::AddEncryptionKey(
diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h
index 3885a458..068ab319 100644
--- a/src/iceberg/table_metadata.h
+++ b/src/iceberg/table_metadata.h
@@ -127,6 +127,12 @@ struct ICEBERG_EXPORT TableMetadata {
   /// A `long` higher than all assigned row IDs
   int64_t next_row_id;
 
+  static Result<std::unique_ptr<TableMetadata>> Make(
+      const iceberg::Schema& schema, const iceberg::PartitionSpec& spec,
+      const iceberg::SortOrder& sort_order, const std::string& location,
+      const std::unordered_map<std::string, std::string>& properties,
+      int format_version = kDefaultTableFormatVersion);
+
   /// \brief Get the current schema, return NotFoundError if not found
   Result<std::shared_ptr<iceberg::Schema>> Schema() const;
   /// \brief Get the current schema by ID, return NotFoundError if not found
@@ -210,9 +216,16 @@ class ICEBERG_EXPORT TableMetadataBuilder : public 
ErrorCollector {
   /// \brief Create a builder from existing table metadata
   ///
   /// \param base The base table metadata to build from
-  /// \return A new TableMetadataBuilder instance initialized with base 
metadata
+  /// \return A new TableMetadataBuilder instance initialized
+  /// with base metadata
   static std::unique_ptr<TableMetadataBuilder> BuildFrom(const TableMetadata* 
base);
 
+  /// \brief Apply changes required to create this table metadata
+  ///
+  /// \param base The table metadata to build from
+  /// \return Reference to this builder for method chaining
+  TableMetadataBuilder& ApplyChangesForCreate(const TableMetadata& base);
+
   /// \brief Set the metadata location of the table
   ///
   /// \param metadata_location The new metadata location
diff --git a/src/iceberg/table_properties.cc b/src/iceberg/table_properties.cc
index 96633bc2..db6adedc 100644
--- a/src/iceberg/table_properties.cc
+++ b/src/iceberg/table_properties.cc
@@ -31,6 +31,13 @@ const std::unordered_set<std::string>& 
TableProperties::reserved_properties() {
   return kReservedProperties;
 }
 
+const std::unordered_set<std::string>& TableProperties::commit_properties() {
+  static const std::unordered_set<std::string> kCommitProperties = {
+      kCommitNumRetries.key(), kCommitMinRetryWaitMs.key(), 
kCommitMaxRetryWaitMs.key(),
+      kCommitTotalRetryTimeMs.key()};
+  return kCommitProperties;
+}
+
 TableProperties TableProperties::default_properties() { return {}; }
 
 TableProperties TableProperties::FromMap(
diff --git a/src/iceberg/table_properties.h b/src/iceberg/table_properties.h
index debe61da..feb4a200 100644
--- a/src/iceberg/table_properties.h
+++ b/src/iceberg/table_properties.h
@@ -286,6 +286,9 @@ class ICEBERG_EXPORT TableProperties : public 
ConfigBase<TableProperties> {
   /// \return The set of reserved property keys
   static const std::unordered_set<std::string>& reserved_properties();
 
+  /// \brief Get the set of commit table property keys.
+  static const std::unordered_set<std::string>& commit_properties();
+
   /// \brief Create a default TableProperties instance.
   ///
   /// \return A unique pointer to a TableProperties instance with default 
values
diff --git a/src/iceberg/table_requirement.h b/src/iceberg/table_requirement.h
index eb818106..82779aec 100644
--- a/src/iceberg/table_requirement.h
+++ b/src/iceberg/table_requirement.h
@@ -43,14 +43,14 @@ namespace iceberg {
 class ICEBERG_EXPORT TableRequirement {
  public:
   enum class Kind : uint8_t {
-    AssertDoesNotExist,
-    AssertUUID,
-    AssertRefSnapshotID,
-    AssertLastAssignedFieldId,
-    AssertCurrentSchemaID,
-    AssertLastAssignedPartitionId,
-    AssertDefaultSpecID,
-    AssertDefaultSortOrderID,
+    kAssertDoesNotExist,
+    kAssertUUID,
+    kAssertRefSnapshotID,
+    kAssertLastAssignedFieldId,
+    kAssertCurrentSchemaID,
+    kAssertLastAssignedPartitionId,
+    kAssertDefaultSpecID,
+    kAssertDefaultSortOrderID,
   };
 
   virtual ~TableRequirement() = default;
@@ -75,7 +75,7 @@ class ICEBERG_EXPORT AssertDoesNotExist : public 
TableRequirement {
  public:
   AssertDoesNotExist() = default;
 
-  Kind kind() const override { return Kind::AssertDoesNotExist; }
+  Kind kind() const override { return Kind::kAssertDoesNotExist; }
 
   Status Validate(const TableMetadata* base) const override;
 };
@@ -90,7 +90,7 @@ class ICEBERG_EXPORT AssertUUID : public TableRequirement {
 
   const std::string& uuid() const { return uuid_; }
 
-  Kind kind() const override { return Kind::AssertUUID; }
+  Kind kind() const override { return Kind::kAssertUUID; }
 
   Status Validate(const TableMetadata* base) const override;
 
@@ -112,7 +112,7 @@ class ICEBERG_EXPORT AssertRefSnapshotID : public 
TableRequirement {
 
   const std::optional<int64_t>& snapshot_id() const { return snapshot_id_; }
 
-  Kind kind() const override { return Kind::AssertRefSnapshotID; }
+  Kind kind() const override { return Kind::kAssertRefSnapshotID; }
 
   Status Validate(const TableMetadata* base) const override;
 
@@ -132,7 +132,7 @@ class ICEBERG_EXPORT AssertLastAssignedFieldId : public 
TableRequirement {
 
   int32_t last_assigned_field_id() const { return last_assigned_field_id_; }
 
-  Kind kind() const override { return Kind::AssertLastAssignedFieldId; }
+  Kind kind() const override { return Kind::kAssertLastAssignedFieldId; }
 
   Status Validate(const TableMetadata* base) const override;
 
@@ -150,7 +150,7 @@ class ICEBERG_EXPORT AssertCurrentSchemaID : public 
TableRequirement {
 
   int32_t schema_id() const { return schema_id_; }
 
-  Kind kind() const override { return Kind::AssertCurrentSchemaID; }
+  Kind kind() const override { return Kind::kAssertCurrentSchemaID; }
 
   Status Validate(const TableMetadata* base) const override;
 
@@ -169,7 +169,7 @@ class ICEBERG_EXPORT AssertLastAssignedPartitionId : public 
TableRequirement {
 
   int32_t last_assigned_partition_id() const { return 
last_assigned_partition_id_; }
 
-  Kind kind() const override { return Kind::AssertLastAssignedPartitionId; }
+  Kind kind() const override { return Kind::kAssertLastAssignedPartitionId; }
 
   Status Validate(const TableMetadata* base) const override;
 
@@ -187,7 +187,7 @@ class ICEBERG_EXPORT AssertDefaultSpecID : public 
TableRequirement {
 
   int32_t spec_id() const { return spec_id_; }
 
-  Kind kind() const override { return Kind::AssertDefaultSpecID; }
+  Kind kind() const override { return Kind::kAssertDefaultSpecID; }
 
   Status Validate(const TableMetadata* base) const override;
 
@@ -206,7 +206,7 @@ class ICEBERG_EXPORT AssertDefaultSortOrderID : public 
TableRequirement {
 
   int32_t sort_order_id() const { return sort_order_id_; }
 
-  Kind kind() const override { return Kind::AssertDefaultSortOrderID; }
+  Kind kind() const override { return Kind::kAssertDefaultSortOrderID; }
 
   Status Validate(const TableMetadata* base) const override;
 
diff --git a/src/iceberg/table_requirements.cc 
b/src/iceberg/table_requirements.cc
index 6de6c59e..8222ded3 100644
--- a/src/iceberg/table_requirements.cc
+++ b/src/iceberg/table_requirements.cc
@@ -19,7 +19,9 @@
 
 #include "iceberg/table_requirements.h"
 
+#include <algorithm>
 #include <memory>
+#include <ranges>
 
 #include "iceberg/snapshot.h"
 #include "iceberg/table_metadata.h"
@@ -134,4 +136,19 @@ Result<std::vector<std::unique_ptr<TableRequirement>>> 
TableRequirements::ForUpd
   return context.Build();
 }
 
+Result<bool> TableRequirements::IsCreate(
+    const std::vector<std::unique_ptr<TableRequirement>>& requirements) {
+  bool is_create = std::ranges::any_of(requirements, [](const auto& req) {
+    return req->kind() == TableRequirement::Kind::kAssertDoesNotExist;
+  });
+
+  if (is_create) {
+    ICEBERG_PRECHECK(
+        requirements.size() == 1,
+        "Cannot have other requirements than AssertDoesNotExist in a table 
creation");
+  }
+
+  return is_create;
+}
+
 }  // namespace iceberg
diff --git a/src/iceberg/table_requirements.h b/src/iceberg/table_requirements.h
index f79f0bea..6deefdf3 100644
--- a/src/iceberg/table_requirements.h
+++ b/src/iceberg/table_requirements.h
@@ -144,6 +144,10 @@ class ICEBERG_EXPORT TableRequirements {
   static Result<std::vector<std::unique_ptr<TableRequirement>>> ForUpdateTable(
       const TableMetadata& base,
       const std::vector<std::unique_ptr<TableUpdate>>& table_updates);
+
+  /// \brief Check if the requirements are for table creation
+  static Result<bool> IsCreate(
+      const std::vector<std::unique_ptr<TableRequirement>>& requirements);
 };
 
 }  // namespace iceberg
diff --git a/src/iceberg/table_update.cc b/src/iceberg/table_update.cc
index 9d92a16e..91578977 100644
--- a/src/iceberg/table_update.cc
+++ b/src/iceberg/table_update.cc
@@ -201,7 +201,7 @@ void 
RemoveProperties::GenerateRequirements(TableUpdateContext& context) const {
 // SetLocation
 
 void SetLocation::ApplyTo(TableMetadataBuilder& builder) const {
-  throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+  builder.SetLocation(location_);
 }
 
 void SetLocation::GenerateRequirements(TableUpdateContext& context) const {
diff --git a/src/iceberg/test/formatter_test.cc 
b/src/iceberg/test/formatter_test.cc
index 767020b2..8878bb7c 100644
--- a/src/iceberg/test/formatter_test.cc
+++ b/src/iceberg/test/formatter_test.cc
@@ -29,6 +29,7 @@
 #include <gtest/gtest.h>
 
 #include "iceberg/statistics_file.h"
+#include "iceberg/table_identifier.h"
 #include "iceberg/util/formatter_internal.h"
 
 namespace iceberg {
@@ -160,4 +161,19 @@ TEST(FormatterTest, StatisticsFileFormat) {
   EXPECT_EQ(expected, std::format("{}", statistics_file));
 }
 
+// For Types that has a ToString function
+TEST(FormatterTest, TableIdentifierFormat) {
+  TableIdentifier empty_ns_table{
+      .ns = Namespace({}),
+      .name = "table_name",
+  };
+  EXPECT_EQ("table_name", std::format("{}", empty_ns_table));
+
+  TableIdentifier table{
+      .ns = Namespace({"ns1", "ns2"}),
+      .name = "table_name",
+  };
+  EXPECT_EQ("ns1.ns2.table_name", std::format("{}", table));
+}
+
 }  // namespace iceberg
diff --git a/src/iceberg/test/in_memory_catalog_test.cc 
b/src/iceberg/test/in_memory_catalog_test.cc
index 194d6da5..78f67dda 100644
--- a/src/iceberg/test/in_memory_catalog_test.cc
+++ b/src/iceberg/test/in_memory_catalog_test.cc
@@ -28,7 +28,9 @@
 #include <gtest/gtest.h>
 
 #include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/partition_spec.h"
 #include "iceberg/schema.h"
+#include "iceberg/sort_order.h"
 #include "iceberg/table.h"
 #include "iceberg/table_identifier.h"
 #include "iceberg/table_metadata.h"
@@ -38,6 +40,8 @@
 #include "iceberg/test/mock_catalog.h"
 #include "iceberg/test/mock_io.h"
 #include "iceberg/test/test_resource.h"
+#include "iceberg/transaction.h"
+#include "iceberg/update/update_properties.h"
 #include "iceberg/util/uuid.h"
 
 namespace iceberg {
@@ -106,6 +110,26 @@ TEST_F(InMemoryCatalogTest, TableExists) {
   EXPECT_THAT(result, HasValue(::testing::Eq(false)));
 }
 
+TEST_F(InMemoryCatalogTest, CreateTable) {
+  TableIdentifier table_ident{.ns = {}, .name = "t1"};
+  auto schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeRequired(1, "x", int64())},
+      /*schema_id=*/1);
+  auto spec = PartitionSpec::Unpartitioned();
+  auto sort_order = SortOrder::Unsorted();
+  auto metadata_location = GenerateTestTableLocation(table_ident.name);
+
+  // Create table successfully
+  auto table = catalog_->CreateTable(table_ident, schema, spec, sort_order,
+                                     metadata_location, {{"property1", 
"value1"}});
+  EXPECT_THAT(table, IsOk());
+
+  // Create table already exists
+  auto table2 = catalog_->CreateTable(table_ident, schema, spec, sort_order,
+                                      metadata_location, {{"property1", 
"value1"}});
+  EXPECT_THAT(table2, IsError(ErrorKind::kAlreadyExists));
+}
+
 TEST_F(InMemoryCatalogTest, RegisterTable) {
   TableIdentifier tableIdent{.ns = {}, .name = "t1"};
 
@@ -231,6 +255,50 @@ TEST_F(InMemoryCatalogTest, UpdateTable) {
               testing::HasSubstr("metadata/00002-"));
 }
 
+TEST_F(InMemoryCatalogTest, StageCreateTable) {
+  TableIdentifier table_ident{.ns = {}, .name = "t1"};
+  auto schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeRequired(1, "x", int64())},
+      /*schema_id=*/1);
+  auto spec = PartitionSpec::Unpartitioned();
+  auto sort_order = SortOrder::Unsorted();
+
+  // Stage table
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto staged_table,
+      catalog_->StageCreateTable(table_ident, schema, spec, sort_order,
+                                 GenerateTestTableLocation(table_ident.name), 
{}));
+
+  // Perform the update
+  ICEBERG_UNWRAP_OR_FAIL(auto update_properties, 
staged_table->NewUpdateProperties());
+  EXPECT_THAT(update_properties->Set("property1", "value1").Commit(), IsOk());
+  auto res1 = staged_table->Commit();
+  EXPECT_THAT(res1, IsOk());
+  auto created_table = res1.value();
+  EXPECT_EQ("t1", created_table->name().name);
+  EXPECT_EQ("value1", created_table->metadata()->properties.Get(
+                          TableProperties::Entry<std::string>("property1", 
"")));
+
+  // Staged already exist table
+  auto res = catalog_->StageCreateTable(table_ident, schema, spec, sort_order,
+                                        
GenerateTestTableLocation(table_ident.name), {});
+  EXPECT_THAT(res, IsError(ErrorKind::kAlreadyExists));
+
+  // Stage create ok but commit already exist
+  TableIdentifier table_ident2{.ns = {}, .name = "t2"};
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto staged_table2,
+      catalog_->StageCreateTable(table_ident2, schema, spec, sort_order,
+                                 GenerateTestTableLocation(table_ident2.name), 
{}));
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto created_table2,
+      catalog_->CreateTable(table_ident2, schema, spec, sort_order,
+                            GenerateTestTableLocation(table_ident2.name), {}));
+
+  auto commit_res = staged_table2->Commit();
+  EXPECT_THAT(commit_res, IsError(ErrorKind::kAlreadyExists));
+}
+
 TEST_F(InMemoryCatalogTest, DropTable) {
   TableIdentifier tableIdent{.ns = {}, .name = "t1"};
   auto result = catalog_->DropTable(tableIdent, false);
diff --git a/src/iceberg/test/table_metadata_builder_test.cc 
b/src/iceberg/test/table_metadata_builder_test.cc
index 4c551fea..77380e89 100644
--- a/src/iceberg/test/table_metadata_builder_test.cc
+++ b/src/iceberg/test/table_metadata_builder_test.cc
@@ -49,6 +49,24 @@ std::shared_ptr<Schema> CreateTestSchema() {
   return std::make_shared<Schema>(std::vector<SchemaField>{field1, field2, 
field3}, 0);
 }
 
+// Helper function to create a simple schema with invalid identifier fields
+std::shared_ptr<Schema> CreateInvalidSchema() {
+  auto field1 = SchemaField::MakeRequired(2, "id", int32());
+  auto field2 = SchemaField::MakeRequired(5, "part_col", string());
+  auto field3 = SchemaField::MakeRequired(8, "sort_col", timestamp());
+  return std::make_shared<Schema>(std::vector<SchemaField>{field1, field2, 
field3},
+                                  std::make_optional(1), 
std::vector<int32_t>{1});
+}
+
+// Helper function to create a simple schema with disordered field_ids
+std::shared_ptr<Schema> CreateDisorderedSchema() {
+  auto field1 = SchemaField::MakeRequired(2, "id", int32());
+  auto field2 = SchemaField::MakeRequired(5, "part_col", string());
+  auto field3 = SchemaField::MakeRequired(8, "sort_col", timestamp());
+  return std::make_shared<Schema>(std::vector<SchemaField>{field1, field2, 
field3},
+                                  std::make_optional(1), 
std::vector<int32_t>{2});
+}
+
 // Helper function to create base metadata for tests
 std::unique_ptr<TableMetadata> CreateBaseMetadata() {
   auto metadata = std::make_unique<TableMetadata>();
@@ -73,6 +91,131 @@ std::unique_ptr<TableMetadata> CreateBaseMetadata() {
 
 }  // namespace
 
+// test for TableMetadata
+TEST(TableMetadataTest, Make) {
+  auto Schema = CreateDisorderedSchema();
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto spec, PartitionSpec::Make(1, 
std::vector<PartitionField>{PartitionField(
+                                            5, 1, "part_name", 
Transform::Identity())}));
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto order, SortOrder::Make(1, std::vector<SortField>{SortField(
+                                         8, Transform::Identity(),
+                                         SortDirection::kAscending, 
NullOrder::kLast)}));
+
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto metadata, TableMetadata::Make(*Schema, *spec, *order, 
"s3://bucket/test", {}));
+  // Check schema fields
+  ASSERT_EQ(1, metadata->schemas.size());
+  auto fields = metadata->schemas[0]->fields() | 
std::ranges::to<std::vector>();
+  ASSERT_EQ(3, fields.size());
+  EXPECT_EQ(1, fields[0].field_id());
+  EXPECT_EQ("id", fields[0].name());
+  EXPECT_EQ(2, fields[1].field_id());
+  EXPECT_EQ("part_col", fields[1].name());
+  EXPECT_EQ(3, fields[2].field_id());
+  EXPECT_EQ("sort_col", fields[2].name());
+  const auto& identifier_field_ids = 
metadata->schemas[0]->IdentifierFieldIds();
+  ASSERT_EQ(1, identifier_field_ids.size());
+  EXPECT_EQ(1, identifier_field_ids[0]);
+
+  // Check partition spec
+  ASSERT_EQ(1, metadata->partition_specs.size());
+  EXPECT_EQ(PartitionSpec::kInitialSpecId, 
metadata->partition_specs[0]->spec_id());
+  auto spec_fields =
+      metadata->partition_specs[0]->fields() | std::ranges::to<std::vector>();
+  ASSERT_EQ(1, spec_fields.size());
+  EXPECT_EQ(PartitionSpec::kInvalidPartitionFieldId + 1, 
spec_fields[0].field_id());
+  EXPECT_EQ(2, spec_fields[0].source_id());
+  EXPECT_EQ("part_name", spec_fields[0].name());
+
+  // Check sort order
+  ASSERT_EQ(1, metadata->sort_orders.size());
+  EXPECT_EQ(SortOrder::kInitialSortOrderId, 
metadata->sort_orders[0]->order_id());
+  auto order_fields = metadata->sort_orders[0]->fields() | 
std::ranges::to<std::vector>();
+  ASSERT_EQ(1, order_fields.size());
+  EXPECT_EQ(3, order_fields[0].source_id());
+  EXPECT_EQ(SortDirection::kAscending, order_fields[0].direction());
+  EXPECT_EQ(NullOrder::kLast, order_fields[0].null_order());
+}
+
+TEST(TableMetadataTest, MakeWithInvalidSchema) {
+  auto schema = CreateInvalidSchema();
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto spec, PartitionSpec::Make(1, 
std::vector<PartitionField>{PartitionField(
+                                            5, 1, "part_name", 
Transform::Identity())}));
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto order, SortOrder::Make(1, std::vector<SortField>{SortField(
+                                         5, Transform::Identity(),
+                                         SortDirection::kAscending, 
NullOrder::kLast)}));
+
+  auto res = TableMetadata::Make(*schema, *spec, *order, "s3://bucket/test", 
{});
+  EXPECT_THAT(res, IsError(ErrorKind::kInvalidSchema));
+  EXPECT_THAT(res, HasErrorMessage("Cannot find identifier field id"));
+}
+
+TEST(TableMetadataTest, MakeWithInvalidPartitionSpec) {
+  auto schema = CreateDisorderedSchema();
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto spec, PartitionSpec::Make(1, 
std::vector<PartitionField>{PartitionField(
+                                            6, 1, "part_name", 
Transform::Identity())}));
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto order, SortOrder::Make(1, std::vector<SortField>{SortField(
+                                         8, Transform::Identity(),
+                                         SortDirection::kAscending, 
NullOrder::kLast)}));
+
+  auto res = TableMetadata::Make(*schema, *spec, *order, "s3://bucket/test", 
{});
+  EXPECT_THAT(res, IsError(ErrorKind::kInvalidSchema));
+  EXPECT_THAT(res, HasErrorMessage("Cannot find source partition field"));
+}
+
+TEST(TableMetadataTest, MakeWithInvalidSortOrder) {
+  auto schema = CreateDisorderedSchema();
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto spec, PartitionSpec::Make(1, 
std::vector<PartitionField>{PartitionField(
+                                            5, 1, "part_name", 
Transform::Identity())}));
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto order, SortOrder::Make(1, std::vector<SortField>{SortField(
+                                         9, Transform::Identity(),
+                                         SortDirection::kAscending, 
NullOrder::kLast)}));
+
+  auto res = TableMetadata::Make(*schema, *spec, *order, "s3://bucket/test", 
{});
+  EXPECT_THAT(res, IsError(ErrorKind::kInvalidSchema));
+  EXPECT_THAT(res, HasErrorMessage("Cannot find source sort field"));
+}
+
+TEST(TableMetadataTest, InvalidProperties) {
+  auto spec = PartitionSpec::Unpartitioned();
+  auto order = SortOrder::Unsorted();
+
+  {
+    // Invalid metrics config
+    auto schema = CreateDisorderedSchema();
+    std::unordered_map<std::string, std::string> invlaid_metric_config = {
+        {std::string(TableProperties::kMetricModeColumnConfPrefix) + 
"unknown_col",
+         "value"}};
+
+    auto res = TableMetadata::Make(*schema, *spec, *order, "s3://bucket/test",
+                                   invlaid_metric_config);
+    EXPECT_THAT(res, IsError(ErrorKind::kValidationFailed));
+    EXPECT_THAT(res, HasErrorMessage("Invalid metrics config"));
+  }
+
+  {
+    // Invaid commit properties
+    auto schema = CreateDisorderedSchema();
+    std::unordered_map<std::string, std::string> invlaid_commit_properties = {
+        {TableProperties::kCommitNumRetries.key(), "-1"}};
+
+    auto res = TableMetadata::Make(*schema, *spec, *order, "s3://bucket/test",
+                                   invlaid_commit_properties);
+    EXPECT_THAT(res, IsError(ErrorKind::kValidationFailed));
+    EXPECT_THAT(res,
+                HasErrorMessage(std::format(
+                    "Table property {} must have non negative integer value, 
but got {}",
+                    TableProperties::kCommitNumRetries.key(), -1)));
+  }
+}
+
 // test construction of TableMetadataBuilder
 TEST(TableMetadataBuilderTest, BuildFromEmpty) {
   auto builder = TableMetadataBuilder::BuildFromEmpty(2);
diff --git a/src/iceberg/test/table_requirement_test.cc 
b/src/iceberg/test/table_requirement_test.cc
index 8b67561f..07055f18 100644
--- a/src/iceberg/test/table_requirement_test.cc
+++ b/src/iceberg/test/table_requirement_test.cc
@@ -36,7 +36,7 @@ TEST(TableRequirementTest, AssertUUID) {
 
   // Success - UUID matches
   table::AssertUUID requirement("test-uuid-1234");
-  EXPECT_EQ(TableRequirement::Kind::AssertUUID, requirement.kind());
+  EXPECT_EQ(TableRequirement::Kind::kAssertUUID, requirement.kind());
   ASSERT_THAT(requirement.Validate(base.get()), IsOk());
 
   // UUID mismatch
@@ -63,7 +63,7 @@ TEST(TableRequirementTest, AssertCurrentSchemaID) {
 
   // Success - schema ID matches
   table::AssertCurrentSchemaID requirement(5);
-  EXPECT_EQ(TableRequirement::Kind::AssertCurrentSchemaID, requirement.kind());
+  EXPECT_EQ(TableRequirement::Kind::kAssertCurrentSchemaID, 
requirement.kind());
   ASSERT_THAT(requirement.Validate(base.get()), IsOk());
 
   // Schema ID mismatch
@@ -89,7 +89,7 @@ TEST(TableRequirementTest, AssertCurrentSchemaID) {
 TEST(TableRequirementTest, AssertDoesNotExist) {
   // Success - table does not exist (null metadata)
   table::AssertDoesNotExist requirement;
-  EXPECT_EQ(TableRequirement::Kind::AssertDoesNotExist, requirement.kind());
+  EXPECT_EQ(TableRequirement::Kind::kAssertDoesNotExist, requirement.kind());
   ASSERT_THAT(requirement.Validate(nullptr), IsOk());
 
   // Table already exists
@@ -108,7 +108,7 @@ TEST(TableRequirementTest, AssertRefSnapshotID) {
 
   // Success - ref snapshot ID matches
   table::AssertRefSnapshotID requirement("main", 100);
-  EXPECT_EQ(TableRequirement::Kind::AssertRefSnapshotID, requirement.kind());
+  EXPECT_EQ(TableRequirement::Kind::kAssertRefSnapshotID, requirement.kind());
   ASSERT_THAT(requirement.Validate(base.get()), IsOk());
 
   // Snapshot ID mismatch
@@ -140,7 +140,7 @@ TEST(TableRequirementTest, AssertLastAssignedFieldId) {
 
   // Success - field ID matches
   table::AssertLastAssignedFieldId requirement(10);
-  EXPECT_EQ(TableRequirement::Kind::AssertLastAssignedFieldId, 
requirement.kind());
+  EXPECT_EQ(TableRequirement::Kind::kAssertLastAssignedFieldId, 
requirement.kind());
   ASSERT_THAT(requirement.Validate(base.get()), IsOk());
 
   // Field ID mismatch
@@ -160,7 +160,7 @@ TEST(TableRequirementTest, AssertLastAssignedPartitionId) {
 
   // Success - partition ID matches
   table::AssertLastAssignedPartitionId requirement(5);
-  EXPECT_EQ(TableRequirement::Kind::AssertLastAssignedPartitionId, 
requirement.kind());
+  EXPECT_EQ(TableRequirement::Kind::kAssertLastAssignedPartitionId, 
requirement.kind());
   ASSERT_THAT(requirement.Validate(base.get()), IsOk());
 
   // Partition ID mismatch
@@ -182,7 +182,7 @@ TEST(TableRequirementTest, AssertDefaultSpecID) {
 
   // Success - spec ID matches
   table::AssertDefaultSpecID requirement(3);
-  EXPECT_EQ(TableRequirement::Kind::AssertDefaultSpecID, requirement.kind());
+  EXPECT_EQ(TableRequirement::Kind::kAssertDefaultSpecID, requirement.kind());
   ASSERT_THAT(requirement.Validate(base.get()), IsOk());
 
   // Spec ID mismatch
@@ -198,7 +198,7 @@ TEST(TableRequirementTest, AssertDefaultSortOrderID) {
 
   // Success - sort order ID matches
   table::AssertDefaultSortOrderID requirement(2);
-  EXPECT_EQ(TableRequirement::Kind::AssertDefaultSortOrderID, 
requirement.kind());
+  EXPECT_EQ(TableRequirement::Kind::kAssertDefaultSortOrderID, 
requirement.kind());
   ASSERT_THAT(requirement.Validate(base.get()), IsOk());
 
   // Sort order ID mismatch
diff --git a/src/iceberg/test/table_requirements_test.cc 
b/src/iceberg/test/table_requirements_test.cc
index bbbcc681..c05e505f 100644
--- a/src/iceberg/test/table_requirements_test.cc
+++ b/src/iceberg/test/table_requirements_test.cc
@@ -102,6 +102,20 @@ TEST(TableRequirementsTest, EmptyUpdatesForCreateTable) {
   EXPECT_NE(assert_does_not_exist, nullptr);
 }
 
+TEST(TableRequirementsTest, IsCreate) {
+  // Should have only AssertDoesNotExist requirement
+  std::vector<std::unique_ptr<TableRequirement>> requirements;
+  requirements.push_back(std::make_unique<table::AssertDoesNotExist>());
+  EXPECT_TRUE(TableRequirements::IsCreate(requirements));
+
+  // Not only have AssertDoesNotExist requirement
+  requirements.push_back(std::make_unique<table::AssertCurrentSchemaID>(0));
+  auto res = TableRequirements::IsCreate(requirements);
+  EXPECT_THAT(res, IsError(ErrorKind::kInvalidArgument));
+  EXPECT_THAT(res,
+              HasErrorMessage("Cannot have other requirements than 
AssertDoesNotExist"));
+}
+
 TEST(TableRequirementsTest, EmptyUpdatesForUpdateTable) {
   auto metadata = CreateBaseMetadata();
   std::vector<std::unique_ptr<TableUpdate>> updates;
diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc
index 9404fe2e..33864ca9 100644
--- a/src/iceberg/transaction.cc
+++ b/src/iceberg/transaction.cc
@@ -36,11 +36,12 @@
 
 namespace iceberg {
 
-Transaction::Transaction(std::shared_ptr<Table> table, Kind kind, bool 
auto_commit)
+Transaction::Transaction(std::shared_ptr<Table> table, Kind kind, bool 
auto_commit,
+                         std::unique_ptr<TableMetadataBuilder> 
metadata_builder)
     : table_(std::move(table)),
       kind_(kind),
       auto_commit_(auto_commit),
-      
metadata_builder_(TableMetadataBuilder::BuildFrom(table_->metadata().get())) {}
+      metadata_builder_(std::move(metadata_builder)) {}
 
 Transaction::~Transaction() = default;
 
@@ -49,8 +50,17 @@ Result<std::shared_ptr<Transaction>> 
Transaction::Make(std::shared_ptr<Table> ta
   if (!table || !table->catalog()) [[unlikely]] {
     return InvalidArgument("Table and catalog cannot be null");
   }
+
+  std::unique_ptr<TableMetadataBuilder> metadata_builder;
+  if (kind == Kind::kCreate) {
+    metadata_builder = TableMetadataBuilder::BuildFromEmpty();
+    std::ignore = metadata_builder->ApplyChangesForCreate(*table->metadata());
+  } else {
+    metadata_builder = 
TableMetadataBuilder::BuildFrom(table->metadata().get());
+  }
+
   return std::shared_ptr<Transaction>(
-      new Transaction(std::move(table), kind, auto_commit));
+      new Transaction(std::move(table), kind, auto_commit, 
std::move(metadata_builder)));
 }
 
 const TableMetadata* Transaction::base() const { return 
metadata_builder_->base(); }
diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h
index 18143b8a..87a2139b 100644
--- a/src/iceberg/transaction.h
+++ b/src/iceberg/transaction.h
@@ -69,7 +69,8 @@ class ICEBERG_EXPORT Transaction : public 
std::enable_shared_from_this<Transacti
   Result<std::shared_ptr<UpdateSortOrder>> NewUpdateSortOrder();
 
  private:
-  Transaction(std::shared_ptr<Table> table, Kind kind, bool auto_commit);
+  Transaction(std::shared_ptr<Table> table, Kind kind, bool auto_commit,
+              std::unique_ptr<TableMetadataBuilder> metadata_builder);
 
   Status AddUpdate(const std::shared_ptr<PendingUpdate>& update);
 
diff --git a/src/iceberg/util/meson.build b/src/iceberg/util/meson.build
index 48477925..880f6340 100644
--- a/src/iceberg/util/meson.build
+++ b/src/iceberg/util/meson.build
@@ -32,6 +32,7 @@ install_headers(
         'location_util.h',
         'macros.h',
         'partition_value_util.h',
+        'property_util.h',
         'string_util.h',
         'temporal_util.h',
         'timepoint.h',
diff --git a/src/iceberg/util/property_util.cc 
b/src/iceberg/util/property_util.cc
new file mode 100644
index 00000000..636083fd
--- /dev/null
+++ b/src/iceberg/util/property_util.cc
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/util/property_util.h"
+
+#include <charconv>
+
+#include "iceberg/table_properties.h"
+
+namespace iceberg {
+
+Status PropertyUtil::ValidateCommitProperties(
+    const std::unordered_map<std::string, std::string>& properties) {
+  for (const auto& property : TableProperties::commit_properties()) {
+    if (auto it = properties.find(property); it != properties.end()) {
+      int32_t parsed;
+      auto [ptr, ec] = std::from_chars(it->second.data(),
+                                       it->second.data() + it->second.size(), 
parsed);
+      if (ec == std::errc::invalid_argument) {
+        return ValidationFailed("Table property {} must have integer value, 
but got {}",
+                                property, it->second);
+      } else if (ec == std::errc::result_out_of_range) {
+        return ValidationFailed("Table property {} value out of range {}", 
property,
+                                it->second);
+      }
+      if (parsed < 0) {
+        return ValidationFailed(
+            "Table property {} must have non negative integer value, but got 
{}",
+            property, parsed);
+      }
+    }
+  }
+  return {};
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/table_identifier.h b/src/iceberg/util/property_util.h
similarity index 56%
copy from src/iceberg/table_identifier.h
copy to src/iceberg/util/property_util.h
index bef9b81d..4e3e9b12 100644
--- a/src/iceberg/table_identifier.h
+++ b/src/iceberg/util/property_util.h
@@ -19,40 +19,18 @@
 
 #pragma once
 
-/// \file iceberg/table_identifier.h
-/// A TableIdentifier is a unique identifier for a table
-
 #include <string>
-#include <vector>
+#include <unordered_map>
 
 #include "iceberg/iceberg_export.h"
 #include "iceberg/result.h"
 
 namespace iceberg {
 
-/// \brief A namespace in a catalog.
-struct ICEBERG_EXPORT Namespace {
-  std::vector<std::string> levels;
-
-  bool operator==(const Namespace& other) const { return levels == 
other.levels; }
-};
-
-/// \brief Identifies a table in iceberg catalog.
-struct ICEBERG_EXPORT TableIdentifier {
-  Namespace ns;
-  std::string name;
-
-  bool operator==(const TableIdentifier& other) const {
-    return ns == other.ns && name == other.name;
-  }
-
-  /// \brief Validates the TableIdentifier.
-  Status Validate() const {
-    if (name.empty()) {
-      return Invalid("Invalid table identifier: missing table name");
-    }
-    return {};
-  }
+class ICEBERG_EXPORT PropertyUtil {
+ public:
+  static Status ValidateCommitProperties(
+      const std::unordered_map<std::string, std::string>& properties);
 };
 
 }  // namespace iceberg

Reply via email to