This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 3d36a9eb feat(rest): implement stage-create table (#485)
3d36a9eb is described below

commit 3d36a9eb88f91f14597928fb3b27d0e5f9465ae8
Author: Feiyang Li <[email protected]>
AuthorDate: Tue Jan 6 18:01:12 2026 +0800

    feat(rest): implement stage-create table (#485)
---
 src/iceberg/catalog/rest/rest_catalog.cc | 47 ++++++++++++++++++++-----------
 src/iceberg/catalog/rest/rest_catalog.h  |  6 ++++
 src/iceberg/catalog/rest/type_fwd.h      |  1 +
 src/iceberg/test/rest_catalog_test.cc    | 48 ++++++++++++++++++++++++++++++++
 4 files changed, 85 insertions(+), 17 deletions(-)

diff --git a/src/iceberg/catalog/rest/rest_catalog.cc 
b/src/iceberg/catalog/rest/rest_catalog.cc
index a799d69a..5ab6e591 100644
--- a/src/iceberg/catalog/rest/rest_catalog.cc
+++ b/src/iceberg/catalog/rest/rest_catalog.cc
@@ -44,6 +44,7 @@
 #include "iceberg/table.h"
 #include "iceberg/table_requirement.h"
 #include "iceberg/table_update.h"
+#include "iceberg/transaction.h"
 #include "iceberg/util/macros.h"
 
 namespace iceberg::rest {
@@ -274,11 +275,11 @@ Result<std::vector<TableIdentifier>> 
RestCatalog::ListTables(const Namespace& ns
   return result;
 }
 
-Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
+Result<LoadTableResult> RestCatalog::CreateTableInternal(
     const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
     const std::shared_ptr<PartitionSpec>& spec, const 
std::shared_ptr<SortOrder>& order,
     const std::string& location,
-    const std::unordered_map<std::string, std::string>& properties) {
+    const std::unordered_map<std::string, std::string>& properties, bool 
stage_create) {
   ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::CreateTable());
   ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Tables(identifier.ns));
 
@@ -288,7 +289,7 @@ Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
       .schema = schema,
       .partition_spec = spec,
       .write_order = order,
-      .stage_create = false,
+      .stage_create = stage_create,
       .properties = properties,
   };
 
@@ -298,10 +299,19 @@ Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
       client_->Post(path, json_request, /*headers=*/{}, 
*TableErrorHandler::Instance()));
 
   ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
-  ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
-  return Table::Make(identifier, load_result.metadata,
-                     std::move(load_result.metadata_location), file_io_,
-                     shared_from_this());
+  return LoadTableResultFromJson(json);
+}
+
+Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
+    const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
+    const std::shared_ptr<PartitionSpec>& spec, const 
std::shared_ptr<SortOrder>& order,
+    const std::string& location,
+    const std::unordered_map<std::string, std::string>& properties) {
+  ICEBERG_ASSIGN_OR_RAISE(auto result,
+                          CreateTableInternal(identifier, schema, spec, order, 
location,
+                                              properties, 
/*stage_create=*/false));
+  return Table::Make(identifier, std::move(result.metadata),
+                     std::move(result.metadata_location), file_io_, 
shared_from_this());
 }
 
 Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
@@ -335,13 +345,19 @@ Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
 }
 
 Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
-    [[maybe_unused]] const TableIdentifier& identifier,
-    [[maybe_unused]] const std::shared_ptr<Schema>& schema,
-    [[maybe_unused]] const std::shared_ptr<PartitionSpec>& spec,
-    [[maybe_unused]] const std::shared_ptr<SortOrder>& order,
-    [[maybe_unused]] const std::string& location,
-    [[maybe_unused]] const std::unordered_map<std::string, std::string>& 
properties) {
-  return NotImplemented("Not implemented");
+    const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
+    const std::shared_ptr<PartitionSpec>& spec, const 
std::shared_ptr<SortOrder>& order,
+    const std::string& location,
+    const std::unordered_map<std::string, std::string>& properties) {
+  ICEBERG_ASSIGN_OR_RAISE(auto result,
+                          CreateTableInternal(identifier, schema, spec, order, 
location,
+                                              properties, 
/*stage_create=*/true));
+  ICEBERG_ASSIGN_OR_RAISE(auto staged_table,
+                          StagedTable::Make(identifier, 
std::move(result.metadata),
+                                            
std::move(result.metadata_location), file_io_,
+                                            shared_from_this()));
+  return Transaction::Make(std::move(staged_table), Transaction::Kind::kCreate,
+                           /*auto_commit=*/false);
 }
 
 Status RestCatalog::DropTable(const TableIdentifier& identifier, bool purge) {
@@ -393,9 +409,6 @@ Result<std::string> RestCatalog::LoadTableInternal(
 }
 
 Result<std::shared_ptr<Table>> RestCatalog::LoadTable(const TableIdentifier& 
identifier) {
-  ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::LoadTable());
-  ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));
-
   ICEBERG_ASSIGN_OR_RAISE(const auto body, LoadTableInternal(identifier));
   ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(body));
   ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
diff --git a/src/iceberg/catalog/rest/rest_catalog.h 
b/src/iceberg/catalog/rest/rest_catalog.h
index 41928cf7..721df29d 100644
--- a/src/iceberg/catalog/rest/rest_catalog.h
+++ b/src/iceberg/catalog/rest/rest_catalog.h
@@ -110,6 +110,12 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog,
 
   Result<std::string> LoadTableInternal(const TableIdentifier& identifier) 
const;
 
+  Result<LoadTableResult> CreateTableInternal(
+      const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
+      const std::shared_ptr<PartitionSpec>& spec, const 
std::shared_ptr<SortOrder>& order,
+      const std::string& location,
+      const std::unordered_map<std::string, std::string>& properties, bool 
stage_create);
+
   std::unique_ptr<RestCatalogProperties> config_;
   std::shared_ptr<FileIO> file_io_;
   std::unique_ptr<HttpClient> client_;
diff --git a/src/iceberg/catalog/rest/type_fwd.h 
b/src/iceberg/catalog/rest/type_fwd.h
index e7fddb91..9a57c11b 100644
--- a/src/iceberg/catalog/rest/type_fwd.h
+++ b/src/iceberg/catalog/rest/type_fwd.h
@@ -25,6 +25,7 @@
 namespace iceberg::rest {
 
 struct ErrorResponse;
+struct LoadTableResult;
 
 class Endpoint;
 class ErrorHandler;
diff --git a/src/iceberg/test/rest_catalog_test.cc 
b/src/iceberg/test/rest_catalog_test.cc
index 7f04de0a..f17b9391 100644
--- a/src/iceberg/test/rest_catalog_test.cc
+++ b/src/iceberg/test/rest_catalog_test.cc
@@ -52,6 +52,7 @@
 #include "iceberg/test/std_io.h"
 #include "iceberg/test/test_resource.h"
 #include "iceberg/test/util/docker_compose_util.h"
+#include "iceberg/transaction.h"
 
 namespace iceberg::rest {
 
@@ -639,4 +640,51 @@ TEST_F(RestCatalogIntegrationTest, RegisterTable) {
   EXPECT_NE(table->name(), registered_table->name());
 }
 
+TEST_F(RestCatalogIntegrationTest, StageCreateTable) {
+  auto catalog_result = CreateCatalog();
+  ASSERT_THAT(catalog_result, IsOk());
+  auto& catalog = catalog_result.value();
+
+  // Create namespace
+  Namespace ns{.levels = {"test_stage_create"}};
+  auto status = catalog->CreateNamespace(ns, {});
+  EXPECT_THAT(status, IsOk());
+
+  // Stage create table
+  auto schema = CreateDefaultSchema();
+  auto partition_spec = PartitionSpec::Unpartitioned();
+  auto sort_order = SortOrder::Unsorted();
+
+  TableIdentifier table_id{.ns = ns, .name = "staged_table"};
+  std::unordered_map<std::string, std::string> table_properties{{"key1", 
"value1"}};
+  auto txn_result = catalog->StageCreateTable(table_id, schema, partition_spec,
+                                              sort_order, "", 
table_properties);
+  ASSERT_THAT(txn_result, IsOk());
+  auto& txn = txn_result.value();
+
+  // Verify the staged table in transaction
+  EXPECT_NE(txn->table(), nullptr);
+  EXPECT_EQ(txn->table()->name(), table_id);
+
+  // Table should NOT exist in catalog yet (staged but not committed)
+  auto exists_result = catalog->TableExists(table_id);
+  ASSERT_THAT(exists_result, IsOk());
+  EXPECT_FALSE(exists_result.value());
+
+  // Commit the transaction
+  auto commit_result = txn->Commit();
+  ASSERT_THAT(commit_result, IsOk());
+  auto& committed_table = commit_result.value();
+
+  // Verify table now exists
+  exists_result = catalog->TableExists(table_id);
+  ASSERT_THAT(exists_result, IsOk());
+  EXPECT_TRUE(exists_result.value());
+
+  // Verify table properties
+  EXPECT_EQ(committed_table->name(), table_id);
+  auto& props = committed_table->metadata()->properties.configs();
+  EXPECT_EQ(props.at("key1"), "value1");
+}
+
 }  // namespace iceberg::rest

Reply via email to