This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 4db7c67e feat(rest): support snapshot loading mode (#543)
4db7c67e is described below
commit 4db7c67ecb624bdbf18a71d24ee5947bab70d792
Author: Feiyang Li <[email protected]>
AuthorDate: Thu Mar 19 15:56:49 2026 +0800
feat(rest): support snapshot loading mode (#543)
---
src/iceberg/catalog/rest/catalog_properties.cc | 14 +
src/iceberg/catalog/rest/catalog_properties.h | 11 +
src/iceberg/catalog/rest/rest_catalog.cc | 27 +-
src/iceberg/catalog/rest/rest_catalog.h | 4 +-
src/iceberg/test/CMakeLists.txt | 2 +-
src/iceberg/test/meson.build | 2 +-
src/iceberg/test/rest_catalog_integration_test.cc | 479 +++++++++++++++
src/iceberg/test/rest_catalog_test.cc | 693 ----------------------
src/iceberg/test/rest_util_test.cc | 49 ++
9 files changed, 578 insertions(+), 703 deletions(-)
diff --git a/src/iceberg/catalog/rest/catalog_properties.cc
b/src/iceberg/catalog/rest/catalog_properties.cc
index 9f12c492..0e417e6c 100644
--- a/src/iceberg/catalog/rest/catalog_properties.cc
+++ b/src/iceberg/catalog/rest/catalog_properties.cc
@@ -19,6 +19,8 @@
#include "iceberg/catalog/rest/catalog_properties.h"
+#include <algorithm>
+#include <string>
#include <string_view>
namespace iceberg::rest {
@@ -47,4 +49,16 @@ Result<std::string_view> RestCatalogProperties::Uri() const {
return it->second;
}
+Result<SnapshotMode> RestCatalogProperties::SnapshotLoadingMode() const {
+ std::string upper = StringUtils::ToUpper(Get(kSnapshotLoadingMode));
+ if (upper == "ALL") {
+ return SnapshotMode::kAll;
+ } else if (upper == "REFS") {
+ return SnapshotMode::kRefs;
+ } else {
+ return InvalidArgument("Invalid snapshot loading mode: '{}'.",
+ Get(kSnapshotLoadingMode));
+ }
+}
+
} // namespace iceberg::rest
diff --git a/src/iceberg/catalog/rest/catalog_properties.h
b/src/iceberg/catalog/rest/catalog_properties.h
index be054dfa..a00aa87d 100644
--- a/src/iceberg/catalog/rest/catalog_properties.h
+++ b/src/iceberg/catalog/rest/catalog_properties.h
@@ -31,6 +31,9 @@
namespace iceberg::rest {
+/// \brief Snapshot loading mode for REST catalog.
+enum class SnapshotMode : uint8_t { kAll, kRefs };
+
/// \brief Configuration class for a REST Catalog.
class ICEBERG_REST_EXPORT RestCatalogProperties
: public ConfigBase<RestCatalogProperties> {
@@ -46,6 +49,8 @@ class ICEBERG_REST_EXPORT RestCatalogProperties
inline static Entry<std::string> kWarehouse{"warehouse", ""};
/// \brief The optional prefix for REST API paths.
inline static Entry<std::string> kPrefix{"prefix", ""};
+ /// \brief The snapshot loading mode (ALL or REFS).
+ inline static Entry<std::string>
kSnapshotLoadingMode{"snapshot-loading-mode", "ALL"};
/// \brief The prefix for HTTP headers.
inline static constexpr std::string_view kHeaderPrefix = "header.";
@@ -62,6 +67,12 @@ class ICEBERG_REST_EXPORT RestCatalogProperties
/// \brief Get the URI of the REST catalog server.
/// \return The URI if configured, or an error if not set or empty.
Result<std::string_view> Uri() const;
+
+ /// \brief Get the snapshot loading mode.
+ /// \return SnapshotMode::kAll if configured as "ALL", SnapshotMode::kRefs if
+ /// "REFS", or an error if the value is invalid. Parsing is
+ /// case-insensitive to match Java behavior.
+ Result<SnapshotMode> SnapshotLoadingMode() const;
};
} // namespace iceberg::rest
diff --git a/src/iceberg/catalog/rest/rest_catalog.cc
b/src/iceberg/catalog/rest/rest_catalog.cc
index caef5041..40e112db 100644
--- a/src/iceberg/catalog/rest/rest_catalog.cc
+++ b/src/iceberg/catalog/rest/rest_catalog.cc
@@ -161,13 +161,17 @@ Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
paths, ResourcePaths::Make(std::string(TrimTrailingSlash(final_uri)),
final_config.Get(RestCatalogProperties::kPrefix)));
+ // Get snapshot loading mode
+ ICEBERG_ASSIGN_OR_RAISE(auto snapshot_mode,
final_config.SnapshotLoadingMode());
+
auto client = std::make_unique<HttpClient>(final_config.ExtractHeaders());
ICEBERG_ASSIGN_OR_RAISE(auto catalog_session,
auth_manager->CatalogSession(*client,
final_config.configs()));
- return std::shared_ptr<RestCatalog>(new RestCatalog(
- std::move(final_config), std::move(file_io), std::move(client),
std::move(paths),
- std::move(endpoints), std::move(auth_manager),
std::move(catalog_session)));
+ return std::shared_ptr<RestCatalog>(
+ new RestCatalog(std::move(final_config), std::move(file_io),
std::move(client),
+ std::move(paths), std::move(endpoints),
std::move(auth_manager),
+ std::move(catalog_session), snapshot_mode));
}
RestCatalog::RestCatalog(RestCatalogProperties config, std::shared_ptr<FileIO>
file_io,
@@ -175,7 +179,8 @@ RestCatalog::RestCatalog(RestCatalogProperties config,
std::shared_ptr<FileIO> f
std::unique_ptr<ResourcePaths> paths,
std::unordered_set<Endpoint> endpoints,
std::unique_ptr<auth::AuthManager> auth_manager,
- std::shared_ptr<auth::AuthSession> catalog_session)
+ std::shared_ptr<auth::AuthSession> catalog_session,
+ SnapshotMode snapshot_mode)
: config_(std::move(config)),
file_io_(std::move(file_io)),
client_(std::move(client)),
@@ -183,7 +188,8 @@ RestCatalog::RestCatalog(RestCatalogProperties config,
std::shared_ptr<FileIO> f
name_(config_.Get(RestCatalogProperties::kName)),
supported_endpoints_(std::move(endpoints)),
auth_manager_(std::move(auth_manager)),
- catalog_session_(std::move(catalog_session)) {
+ catalog_session_(std::move(catalog_session)),
+ snapshot_mode_(snapshot_mode) {
ICEBERG_DCHECK(catalog_session_ != nullptr, "catalog_session must not be
null");
}
@@ -442,9 +448,17 @@ Result<std::string> RestCatalog::LoadTableInternal(
const TableIdentifier& identifier) const {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::LoadTable());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));
+
+ std::unordered_map<std::string, std::string> params;
+ if (snapshot_mode_ == SnapshotMode::kRefs) {
+ params["snapshots"] = "refs";
+ } else {
+ params["snapshots"] = "all";
+ }
+
ICEBERG_ASSIGN_OR_RAISE(
const auto response,
- client_->Get(path, /*params=*/{}, /*headers=*/{},
*TableErrorHandler::Instance(),
+ client_->Get(path, params, /*headers=*/{},
*TableErrorHandler::Instance(),
*catalog_session_));
return response.body();
}
@@ -453,7 +467,6 @@ Result<std::shared_ptr<Table>> RestCatalog::LoadTable(const
TableIdentifier& ide
ICEBERG_ASSIGN_OR_RAISE(const auto body, LoadTableInternal(identifier));
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(body));
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
-
return Table::Make(identifier, std::move(load_result.metadata),
std::move(load_result.metadata_location), file_io_,
shared_from_this());
diff --git a/src/iceberg/catalog/rest/rest_catalog.h
b/src/iceberg/catalog/rest/rest_catalog.h
index 5cc61eae..38230a5e 100644
--- a/src/iceberg/catalog/rest/rest_catalog.h
+++ b/src/iceberg/catalog/rest/rest_catalog.h
@@ -109,7 +109,8 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog,
std::unique_ptr<HttpClient> client,
std::unique_ptr<ResourcePaths> paths,
std::unordered_set<Endpoint> endpoints,
std::unique_ptr<auth::AuthManager> auth_manager,
- std::shared_ptr<auth::AuthSession> catalog_session);
+ std::shared_ptr<auth::AuthSession> catalog_session,
+ SnapshotMode snapshot_mode);
Result<std::string> LoadTableInternal(const TableIdentifier& identifier)
const;
@@ -127,6 +128,7 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog,
std::unordered_set<Endpoint> supported_endpoints_;
std::unique_ptr<auth::AuthManager> auth_manager_;
std::shared_ptr<auth::AuthSession> catalog_session_;
+ SnapshotMode snapshot_mode_;
};
} // namespace iceberg::rest
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index c4ec29c4..768e0507 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -225,7 +225,7 @@ if(ICEBERG_BUILD_REST)
if(ICEBERG_BUILD_REST_INTEGRATION_TESTS)
add_rest_iceberg_test(rest_catalog_integration_test
SOURCES
- rest_catalog_test.cc
+ rest_catalog_integration_test.cc
util/cmd_util.cc
util/docker_compose_util.cc)
endif()
diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build
index 71ab6942..df2d5db8 100644
--- a/src/iceberg/test/meson.build
+++ b/src/iceberg/test/meson.build
@@ -120,7 +120,7 @@ if get_option('rest').enabled()
iceberg_tests += {
'rest_integration_test': {
'sources': files(
- 'rest_catalog_test.cc',
+ 'rest_catalog_integration_test.cc',
'util/cmd_util.cc',
'util/docker_compose_util.cc',
),
diff --git a/src/iceberg/test/rest_catalog_integration_test.cc
b/src/iceberg/test/rest_catalog_integration_test.cc
new file mode 100644
index 00000000..b364ffd3
--- /dev/null
+++ b/src/iceberg/test/rest_catalog_integration_test.cc
@@ -0,0 +1,479 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <unistd.h>
+
+#include <chrono>
+#include <memory>
+#include <print>
+#include <string>
+#include <thread>
+#include <unordered_map>
+
+#include <arpa/inet.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+#include <netinet/in.h>
+#include <nlohmann/json.hpp>
+#include <sys/socket.h>
+
+#include "iceberg/catalog/rest/auth/auth_session.h"
+#include "iceberg/catalog/rest/catalog_properties.h"
+#include "iceberg/catalog/rest/error_handlers.h"
+#include "iceberg/catalog/rest/http_client.h"
+#include "iceberg/catalog/rest/json_serde_internal.h"
+#include "iceberg/catalog/rest/rest_catalog.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/result.h"
+#include "iceberg/schema.h"
+#include "iceberg/sort_order.h"
+#include "iceberg/table.h"
+#include "iceberg/table_identifier.h"
+#include "iceberg/table_requirement.h"
+#include "iceberg/table_update.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/test/std_io.h"
+#include "iceberg/test/test_resource.h"
+#include "iceberg/test/util/docker_compose_util.h"
+#include "iceberg/transaction.h"
+
+namespace iceberg::rest {
+
+namespace {
+
+constexpr uint16_t kRestCatalogPort = 8181;
+constexpr int kMaxRetries = 60;
+constexpr int kRetryDelayMs = 1000;
+
+constexpr std::string_view kDockerProjectName = "iceberg-rest-catalog-service";
+constexpr std::string_view kCatalogName = "test_catalog";
+constexpr std::string_view kWarehouseName = "default";
+constexpr std::string_view kLocalhostUri = "http://localhost";
+
+/// \brief Check if a localhost port is ready to accept connections.
+bool CheckServiceReady(uint16_t port) {
+ int sock = socket(AF_INET, SOCK_STREAM, 0);
+ if (sock < 0) return false;
+
+ struct timeval timeout{
+ .tv_sec = 1,
+ .tv_usec = 0,
+ };
+ setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout));
+
+ sockaddr_in addr{
+ .sin_family = AF_INET,
+ .sin_port = htons(port),
+ .sin_addr = {.s_addr = htonl(INADDR_LOOPBACK)},
+ };
+ bool result =
+ (connect(sock, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr))
== 0);
+ close(sock);
+ return result;
+}
+
+std::string CatalogUri() { return std::format("{}:{}", kLocalhostUri,
kRestCatalogPort); }
+
+} // namespace
+
+/// \brief Integration test fixture for REST catalog with Docker Compose.
+class RestCatalogIntegrationTest : public ::testing::Test {
+ protected:
+ static void SetUpTestSuite() {
+ docker_compose_ = std::make_unique<DockerCompose>(
+ std::string{kDockerProjectName},
GetResourcePath("iceberg-rest-fixture"));
+ docker_compose_->Up();
+
+ std::println("[INFO] Waiting for REST catalog at localhost:{}...",
kRestCatalogPort);
+ for (int i = 0; i < kMaxRetries; ++i) {
+ if (CheckServiceReady(kRestCatalogPort)) {
+ std::println("[INFO] REST catalog is ready!");
+ return;
+ }
+ std::println("[INFO] Retrying... (attempt {}/{})", i + 1, kMaxRetries);
+ std::this_thread::sleep_for(std::chrono::milliseconds(kRetryDelayMs));
+ }
+ throw std::runtime_error("REST catalog failed to start within timeout");
+ }
+
+ static void TearDownTestSuite() { docker_compose_.reset(); }
+
+ /// Create a catalog with default configuration.
+ Result<std::shared_ptr<RestCatalog>> CreateCatalog() {
+ return CreateCatalogWithProperties({});
+ }
+
+ /// Create a catalog with additional properties merged on top of defaults.
+ Result<std::shared_ptr<RestCatalog>> CreateCatalogWithProperties(
+ const std::unordered_map<std::string, std::string>& extra) {
+ auto config = RestCatalogProperties::default_properties();
+ config.Set(RestCatalogProperties::kUri, CatalogUri())
+ .Set(RestCatalogProperties::kName, std::string(kCatalogName))
+ .Set(RestCatalogProperties::kWarehouse, std::string(kWarehouseName));
+ for (const auto& [k, v] : extra) {
+ config.mutable_configs()[k] = v;
+ }
+ return RestCatalog::Make(config, std::make_shared<test::StdFileIO>());
+ }
+
+ /// Create a catalog configured with a specific snapshot loading mode.
+ Result<std::shared_ptr<RestCatalog>> CreateCatalogWithSnapshotMode(
+ const std::string& mode) {
+ return CreateCatalogWithProperties(
+ {{RestCatalogProperties::kSnapshotLoadingMode.key(), mode}});
+ }
+
+ /// Convenience: create a namespace and return the catalog.
+ Result<std::shared_ptr<RestCatalog>> CreateCatalogAndNamespace(const
Namespace& ns) {
+ ICEBERG_ASSIGN_OR_RAISE(auto catalog, CreateCatalog());
+ auto status = catalog->CreateNamespace(ns, {});
+ if (!status.has_value()) {
+ return std::unexpected(status.error());
+ }
+ return catalog;
+ }
+
+ /// Default two-column schema used across tests.
+ static std::shared_ptr<Schema> DefaultSchema() {
+ return std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeOptional(2, "data",
string())},
+ /*schema_id=*/1);
+ }
+
+ /// Create a table with default schema and no partitioning.
+ Result<std::shared_ptr<Table>> CreateDefaultTable(
+ const std::shared_ptr<RestCatalog>& catalog, const TableIdentifier&
table_id,
+ const std::unordered_map<std::string, std::string>& props = {}) {
+ return catalog->CreateTable(table_id, DefaultSchema(),
PartitionSpec::Unpartitioned(),
+ SortOrder::Unsorted(), "", props);
+ }
+
+ static inline std::unique_ptr<DockerCompose> docker_compose_;
+};
+
+TEST_F(RestCatalogIntegrationTest, MakeCatalogSuccess) {
+ ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalog());
+ EXPECT_EQ(catalog->name(), kCatalogName);
+}
+
+TEST_F(RestCatalogIntegrationTest, FetchServerConfigDirect) {
+ HttpClient client({});
+ auto noop_session = auth::AuthSession::MakeDefault({});
+ std::string config_url = std::format("{}/v1/config", CatalogUri());
+
+ ICEBERG_UNWRAP_OR_FAIL(const auto response,
+ client.Get(config_url, {}, /*headers=*/{},
+ *DefaultErrorHandler::Instance(),
*noop_session));
+ ICEBERG_UNWRAP_OR_FAIL(auto json, FromJsonString(response.body()));
+
+ EXPECT_TRUE(json.contains("defaults"));
+ EXPECT_TRUE(json.contains("overrides"));
+
+ if (json.contains("endpoints")) {
+ EXPECT_TRUE(json["endpoints"].is_array());
+ ICEBERG_UNWRAP_OR_FAIL(auto config, CatalogConfigFromJson(json));
+ std::println("[INFO] Server provided {} endpoints",
config.endpoints.size());
+ EXPECT_GT(config.endpoints.size(), 0);
+ }
+}
+
+// -- Namespace operations --
+
+TEST_F(RestCatalogIntegrationTest, ListNamespaces) {
+ ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalog());
+ ICEBERG_UNWRAP_OR_FAIL(auto result,
catalog->ListNamespaces(Namespace{.levels = {}}));
+ EXPECT_TRUE(result.empty());
+}
+
+TEST_F(RestCatalogIntegrationTest, CreateNamespace) {
+ ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalog());
+
+ Namespace ns{.levels = {"test_ns"}};
+ ASSERT_THAT(catalog->CreateNamespace(ns, {}), IsOk());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto namespaces,
+ catalog->ListNamespaces(Namespace{.levels = {}}));
+ EXPECT_EQ(namespaces.size(), 1);
+ EXPECT_EQ(namespaces[0].levels, std::vector<std::string>{"test_ns"});
+}
+
+TEST_F(RestCatalogIntegrationTest, CreateNamespaceWithProperties) {
+ ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalog());
+
+ Namespace ns{.levels = {"test_ns_props"}};
+ std::unordered_map<std::string, std::string> properties{
+ {"owner", "test_user"}, {"description", "Test namespace with
properties"}};
+ ASSERT_THAT(catalog->CreateNamespace(ns, properties), IsOk());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto props, catalog->GetNamespaceProperties(ns));
+ EXPECT_EQ(props.at("owner"), "test_user");
+ EXPECT_EQ(props.at("description"), "Test namespace with properties");
+}
+
+TEST_F(RestCatalogIntegrationTest, CreateNestedNamespace) {
+ ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalog());
+
+ Namespace parent{.levels = {"parent"}};
+ Namespace child{.levels = {"parent", "child"}};
+ ASSERT_THAT(catalog->CreateNamespace(parent, {}), IsOk());
+ ASSERT_THAT(catalog->CreateNamespace(child, {}), IsOk());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto children, catalog->ListNamespaces(parent));
+ EXPECT_EQ(children.size(), 1);
+ EXPECT_EQ(children[0].levels, (std::vector<std::string>{"parent", "child"}));
+}
+
+TEST_F(RestCatalogIntegrationTest, GetNamespaceProperties) {
+ ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalog());
+
+ Namespace ns{.levels = {"test_get_props"}};
+ ASSERT_THAT(catalog->CreateNamespace(ns, {{"key1", "value1"}, {"key2",
"value2"}}),
+ IsOk());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto props, catalog->GetNamespaceProperties(ns));
+ EXPECT_EQ(props.at("key1"), "value1");
+ EXPECT_EQ(props.at("key2"), "value2");
+}
+
+TEST_F(RestCatalogIntegrationTest, NamespaceExists) {
+ ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalog());
+
+ Namespace ns{.levels = {"non_existent"}};
+ ICEBERG_UNWRAP_OR_FAIL(auto before, catalog->NamespaceExists(ns));
+ EXPECT_FALSE(before);
+
+ ASSERT_THAT(catalog->CreateNamespace(ns, {}), IsOk());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto after, catalog->NamespaceExists(ns));
+ EXPECT_TRUE(after);
+}
+
+TEST_F(RestCatalogIntegrationTest, UpdateNamespaceProperties) {
+ ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalog());
+
+ Namespace ns{.levels = {"test_update"}};
+ ASSERT_THAT(catalog->CreateNamespace(ns, {{"key1", "value1"}, {"key2",
"value2"}}),
+ IsOk());
+
+ ASSERT_THAT(catalog->UpdateNamespaceProperties(
+ ns, {{"key1", "updated_value1"}, {"key3", "value3"}},
{"key2"}),
+ IsOk());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto props, catalog->GetNamespaceProperties(ns));
+ EXPECT_EQ(props.at("key1"), "updated_value1");
+ EXPECT_EQ(props.at("key3"), "value3");
+ EXPECT_EQ(props.count("key2"), 0);
+}
+
+TEST_F(RestCatalogIntegrationTest, DropNamespace) {
+ ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalog());
+
+ Namespace ns{.levels = {"test_drop"}};
+ ASSERT_THAT(catalog->CreateNamespace(ns, {}), IsOk());
+ ASSERT_THAT(catalog->DropNamespace(ns), IsOk());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto exists, catalog->NamespaceExists(ns));
+ EXPECT_FALSE(exists);
+}
+
+TEST_F(RestCatalogIntegrationTest, DropNonExistentNamespace) {
+ ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalog());
+ EXPECT_THAT(catalog->DropNamespace(Namespace{.levels = {"nonexistent"}}),
+ IsError(ErrorKind::kNoSuchNamespace));
+}
+
+// -- Table operations --
+
+TEST_F(RestCatalogIntegrationTest, CreateTable) {
+ ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalog());
+
+ // Build nested namespace hierarchy
+ Namespace ns{.levels = {"test_create_table", "apple", "ios"}};
+ ASSERT_THAT(catalog->CreateNamespace(Namespace{.levels =
{"test_create_table"}}, {}),
+ IsOk());
+ ASSERT_THAT(
+ catalog->CreateNamespace(Namespace{.levels = {"test_create_table",
"apple"}}, {}),
+ IsOk());
+ ASSERT_THAT(catalog->CreateNamespace(ns, {{"owner", "ray"}, {"community",
"apache"}}),
+ IsOk());
+
+ TableIdentifier table_id{.ns = ns, .name = "t1"};
+ ICEBERG_UNWRAP_OR_FAIL(auto table, CreateDefaultTable(catalog, table_id));
+
+ EXPECT_EQ(table->name().ns.levels,
+ (std::vector<std::string>{"test_create_table", "apple", "ios"}));
+ EXPECT_EQ(table->name().name, "t1");
+
+ // Duplicate creation should fail
+ EXPECT_THAT(CreateDefaultTable(catalog, table_id),
IsError(ErrorKind::kAlreadyExists));
+}
+
+TEST_F(RestCatalogIntegrationTest, ListTables) {
+ Namespace ns{.levels = {"test_list_tables"}};
+ ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalogAndNamespace(ns));
+
+ ICEBERG_UNWRAP_OR_FAIL(auto empty_list, catalog->ListTables(ns));
+ EXPECT_TRUE(empty_list.empty());
+
+ TableIdentifier t1{.ns = ns, .name = "table1"};
+ TableIdentifier t2{.ns = ns, .name = "table2"};
+ ASSERT_THAT(CreateDefaultTable(catalog, t1), IsOk());
+ ASSERT_THAT(CreateDefaultTable(catalog, t2), IsOk());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto tables, catalog->ListTables(ns));
+ EXPECT_THAT(tables, testing::UnorderedElementsAre(
+ testing::Field(&TableIdentifier::name, "table1"),
+ testing::Field(&TableIdentifier::name, "table2")));
+}
+
+TEST_F(RestCatalogIntegrationTest, LoadTable) {
+ Namespace ns{.levels = {"test_load_table"}};
+ ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalogAndNamespace(ns));
+
+ TableIdentifier table_id{.ns = ns, .name = "test_table"};
+ ASSERT_THAT(CreateDefaultTable(catalog, table_id, {{"key1", "value1"}}),
IsOk());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto loaded, catalog->LoadTable(table_id));
+ EXPECT_EQ(loaded->name().ns.levels,
std::vector<std::string>{"test_load_table"});
+ EXPECT_EQ(loaded->name().name, "test_table");
+ EXPECT_NE(loaded->metadata(), nullptr);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto schema, loaded->schema());
+ ASSERT_EQ(schema->fields().size(), 2);
+ EXPECT_EQ(schema->fields()[0].name(), "id");
+ EXPECT_EQ(schema->fields()[1].name(), "data");
+}
+
+TEST_F(RestCatalogIntegrationTest, DropTable) {
+ Namespace ns{.levels = {"test_drop_table"}};
+ ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalogAndNamespace(ns));
+
+ TableIdentifier table_id{.ns = ns, .name = "table_to_drop"};
+ ASSERT_THAT(CreateDefaultTable(catalog, table_id), IsOk());
+
+ ASSERT_THAT(catalog->DropTable(table_id, /*purge=*/false), IsOk());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto exists, catalog->TableExists(table_id));
+ EXPECT_FALSE(exists);
+}
+
+TEST_F(RestCatalogIntegrationTest, RenameTable) {
+ Namespace ns{.levels = {"test_rename_table"}};
+ ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalogAndNamespace(ns));
+
+ TableIdentifier old_id{.ns = ns, .name = "old_table"};
+ TableIdentifier new_id{.ns = ns, .name = "new_table"};
+ ASSERT_THAT(CreateDefaultTable(catalog, old_id), IsOk());
+ ASSERT_THAT(catalog->RenameTable(old_id, new_id), IsOk());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto old_exists, catalog->TableExists(old_id));
+ EXPECT_FALSE(old_exists);
+ ICEBERG_UNWRAP_OR_FAIL(auto new_exists, catalog->TableExists(new_id));
+ EXPECT_TRUE(new_exists);
+}
+
+TEST_F(RestCatalogIntegrationTest, UpdateTable) {
+ Namespace ns{.levels = {"test_update_table"}};
+ ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalogAndNamespace(ns));
+
+ TableIdentifier table_id{.ns = ns, .name = "t1"};
+ ICEBERG_UNWRAP_OR_FAIL(auto table, CreateDefaultTable(catalog, table_id));
+
+ std::vector<std::unique_ptr<TableRequirement>> requirements;
+ requirements.push_back(std::make_unique<table::AssertUUID>(table->uuid()));
+
+ std::vector<std::unique_ptr<TableUpdate>> updates;
+ updates.push_back(std::make_unique<table::SetProperties>(
+ std::unordered_map<std::string, std::string>{{"key1", "value1"}}));
+
+ ICEBERG_UNWRAP_OR_FAIL(auto updated,
+ catalog->UpdateTable(table_id, requirements,
updates));
+ EXPECT_EQ(updated->metadata()->properties.configs().at("key1"), "value1");
+}
+
+TEST_F(RestCatalogIntegrationTest, RegisterTable) {
+ Namespace ns{.levels = {"test_register_table"}};
+ ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalogAndNamespace(ns));
+
+ TableIdentifier table_id{.ns = ns, .name = "t1"};
+ ICEBERG_UNWRAP_OR_FAIL(auto table, CreateDefaultTable(catalog, table_id));
+ std::string metadata_location(table->metadata_file_location());
+
+ ASSERT_THAT(catalog->DropTable(table_id, /*purge=*/false), IsOk());
+
+ TableIdentifier new_id{.ns = ns, .name = "t2"};
+ ICEBERG_UNWRAP_OR_FAIL(auto registered,
+ catalog->RegisterTable(new_id, metadata_location));
+ EXPECT_EQ(table->metadata_file_location(),
registered->metadata_file_location());
+ EXPECT_NE(table->name(), registered->name());
+}
+
+TEST_F(RestCatalogIntegrationTest, StageCreateTable) {
+ Namespace ns{.levels = {"test_stage_create"}};
+ ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalogAndNamespace(ns));
+
+ TableIdentifier table_id{.ns = ns, .name = "staged_table"};
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto txn,
+ catalog->StageCreateTable(table_id, DefaultSchema(),
PartitionSpec::Unpartitioned(),
+ SortOrder::Unsorted(), "", {{"key1",
"value1"}}));
+
+ EXPECT_EQ(txn->table()->name(), table_id);
+
+ // Not yet visible in catalog
+ ICEBERG_UNWRAP_OR_FAIL(auto before, catalog->TableExists(table_id));
+ EXPECT_FALSE(before);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto committed, txn->Commit());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto after, catalog->TableExists(table_id));
+ EXPECT_TRUE(after);
+ EXPECT_EQ(committed->metadata()->properties.configs().at("key1"), "value1");
+}
+
+// -- Snapshot loading mode --
+
+TEST_F(RestCatalogIntegrationTest, LoadTableWithSnapshotModeAll) {
+ Namespace ns{.levels = {"test_snapshot_all"}};
+ ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalogWithSnapshotMode("ALL"));
+ ASSERT_THAT(catalog->CreateNamespace(ns, {}), IsOk());
+
+ TableIdentifier table_id{.ns = ns, .name = "all_mode_table"};
+ ASSERT_THAT(CreateDefaultTable(catalog, table_id), IsOk());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto loaded, catalog->LoadTable(table_id));
+ EXPECT_NE(loaded->metadata(), nullptr);
+ EXPECT_FALSE(loaded->metadata()->schemas.empty());
+}
+
+TEST_F(RestCatalogIntegrationTest, LoadTableWithSnapshotModeRefs) {
+ Namespace ns{.levels = {"test_snapshot_refs"}};
+ ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalogWithSnapshotMode("REFS"));
+ ASSERT_THAT(catalog->CreateNamespace(ns, {}), IsOk());
+
+ TableIdentifier table_id{.ns = ns, .name = "refs_mode_table"};
+ ASSERT_THAT(CreateDefaultTable(catalog, table_id), IsOk());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto loaded, catalog->LoadTable(table_id));
+ EXPECT_NE(loaded->metadata(), nullptr);
+ EXPECT_FALSE(loaded->metadata()->schemas.empty());
+}
+
+} // namespace iceberg::rest
diff --git a/src/iceberg/test/rest_catalog_test.cc
b/src/iceberg/test/rest_catalog_test.cc
deleted file mode 100644
index d4a9477b..00000000
--- a/src/iceberg/test/rest_catalog_test.cc
+++ /dev/null
@@ -1,693 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "iceberg/catalog/rest/rest_catalog.h"
-
-#include <unistd.h>
-
-#include <algorithm>
-#include <chrono>
-#include <memory>
-#include <print>
-#include <string>
-#include <thread>
-#include <unordered_map>
-
-#include <arpa/inet.h>
-#include <gmock/gmock.h>
-#include <gtest/gtest.h>
-#include <netinet/in.h>
-#include <nlohmann/json.hpp>
-#include <sys/socket.h>
-
-#include "iceberg/catalog/rest/auth/auth_session.h"
-#include "iceberg/catalog/rest/catalog_properties.h"
-#include "iceberg/catalog/rest/error_handlers.h"
-#include "iceberg/catalog/rest/http_client.h"
-#include "iceberg/catalog/rest/json_serde_internal.h"
-#include "iceberg/partition_spec.h"
-#include "iceberg/result.h"
-#include "iceberg/schema.h"
-#include "iceberg/sort_order.h"
-#include "iceberg/table.h"
-#include "iceberg/table_identifier.h"
-#include "iceberg/table_requirement.h"
-#include "iceberg/table_update.h"
-#include "iceberg/test/matchers.h"
-#include "iceberg/test/std_io.h"
-#include "iceberg/test/test_resource.h"
-#include "iceberg/test/util/docker_compose_util.h"
-#include "iceberg/transaction.h"
-
-namespace iceberg::rest {
-
-namespace {
-
-constexpr uint16_t kRestCatalogPort = 8181;
-constexpr int kMaxRetries = 60; // Wait up to 60 seconds
-constexpr int kRetryDelayMs = 1000;
-
-constexpr std::string_view kDockerProjectName = "iceberg-rest-catalog-service";
-constexpr std::string_view kCatalogName = "test_catalog";
-constexpr std::string_view kWarehouseName = "default";
-constexpr std::string_view kLocalhostUri = "http://localhost";
-
-/// \brief Check if a localhost port is ready to accept connections
-/// \param port Port number to check
-/// \return true if the port is accessible on localhost, false otherwise
-bool CheckServiceReady(uint16_t port) {
- int sock = socket(AF_INET, SOCK_STREAM, 0);
- if (sock < 0) {
- return false;
- }
-
- struct timeval timeout{
- .tv_sec = 1,
- .tv_usec = 0,
- };
- setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout));
-
- sockaddr_in addr{
- .sin_family = AF_INET,
- .sin_port = htons(port),
- .sin_addr = {.s_addr = htonl(INADDR_LOOPBACK)} // 127.0.0.1
- };
- bool result =
- (connect(sock, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr))
== 0);
- close(sock);
- return result;
-}
-
-} // namespace
-
-/// \brief Integration test fixture for REST catalog with automatic Docker
Compose setup。
-class RestCatalogIntegrationTest : public ::testing::Test {
- protected:
- static void SetUpTestSuite() {
- std::string project_name{kDockerProjectName};
- std::filesystem::path resources_dir =
GetResourcePath("iceberg-rest-fixture");
-
- // Create and start DockerCompose
- docker_compose_ = std::make_unique<DockerCompose>(project_name,
resources_dir);
- docker_compose_->Up();
-
- // Wait for REST catalog to be ready on localhost
- std::println("[INFO] Waiting for REST catalog to be ready at
localhost:{}...",
- kRestCatalogPort);
- for (int i = 0; i < kMaxRetries; ++i) {
- if (CheckServiceReady(kRestCatalogPort)) {
- std::println("[INFO] REST catalog is ready!");
- return;
- }
- std::println(
- "[INFO] Waiting for 1s for REST catalog to be ready... (attempt
{}/{})", i + 1,
- kMaxRetries);
- std::this_thread::sleep_for(std::chrono::milliseconds(kRetryDelayMs));
- }
- throw std::runtime_error("REST catalog failed to start within {} seconds");
- }
-
- static void TearDownTestSuite() { docker_compose_.reset(); }
-
- void SetUp() override {}
-
- void TearDown() override {}
-
- // Helper function to create a REST catalog instance
- Result<std::shared_ptr<RestCatalog>> CreateCatalog() {
- auto config = RestCatalogProperties::default_properties();
- config
- .Set(RestCatalogProperties::kUri,
- std::format("{}:{}", kLocalhostUri, kRestCatalogPort))
- .Set(RestCatalogProperties::kName, std::string(kCatalogName))
- .Set(RestCatalogProperties::kWarehouse, std::string(kWarehouseName));
- auto file_io = std::make_shared<test::StdFileIO>();
- return RestCatalog::Make(config, std::make_shared<test::StdFileIO>());
- }
-
- // Helper function to create a default schema for testing
- std::shared_ptr<Schema> CreateDefaultSchema() {
- return std::make_shared<Schema>(
- std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
- SchemaField::MakeOptional(2, "data",
string())},
- /*schema_id=*/1);
- }
-
- static inline std::unique_ptr<DockerCompose> docker_compose_;
-};
-
-TEST_F(RestCatalogIntegrationTest, MakeCatalogSuccess) {
- auto catalog_result = CreateCatalog();
- ASSERT_THAT(catalog_result, IsOk());
-
- auto& catalog = catalog_result.value();
- EXPECT_EQ(catalog->name(), kCatalogName);
-}
-
-/// Verifies that the server's /v1/config endpoint returns a valid response
-/// and that the endpoints field (if present) can be parsed correctly.
-TEST_F(RestCatalogIntegrationTest, FetchServerConfigDirect) {
- // Create HTTP client and fetch config directly
- HttpClient client({});
- auto noop_session = auth::AuthSession::MakeDefault({});
- std::string config_url =
- std::format("{}:{}/v1/config", kLocalhostUri, kRestCatalogPort);
-
- auto response_result = client.Get(config_url, {}, /*headers=*/{},
- *DefaultErrorHandler::Instance(),
*noop_session);
- ASSERT_THAT(response_result, IsOk());
- auto json_result = FromJsonString(response_result->body());
- ASSERT_THAT(json_result, IsOk());
- auto& json = json_result.value();
-
- EXPECT_TRUE(json.contains("defaults"));
- EXPECT_TRUE(json.contains("overrides"));
-
- if (json.contains("endpoints")) {
- EXPECT_TRUE(json["endpoints"].is_array());
-
- // Parse the config to ensure all endpoints are valid
- auto config_result = CatalogConfigFromJson(json);
- ASSERT_THAT(config_result, IsOk());
- auto& config = config_result.value();
- std::println("[INFO] Server provided {} endpoints",
config.endpoints.size());
- EXPECT_GT(config.endpoints.size(), 0)
- << "Server should provide at least one endpoint";
- } else {
- std::println(
- "[INFO] Server did not provide endpoints field, client will use
default "
- "endpoints");
- }
-}
-
-TEST_F(RestCatalogIntegrationTest, ListNamespaces) {
- auto catalog_result = CreateCatalog();
- ASSERT_THAT(catalog_result, IsOk());
- auto& catalog = catalog_result.value();
-
- Namespace root{.levels = {}};
- auto result = catalog->ListNamespaces(root);
- EXPECT_THAT(result, IsOk());
- EXPECT_TRUE(result->empty());
-}
-
-TEST_F(RestCatalogIntegrationTest, CreateNamespace) {
- auto catalog_result = CreateCatalog();
- ASSERT_THAT(catalog_result, IsOk());
- auto& catalog = catalog_result.value();
-
- // Create a simple namespace
- Namespace ns{.levels = {"test_ns"}};
- auto status = catalog->CreateNamespace(ns, {});
- EXPECT_THAT(status, IsOk());
-
- // Verify it was created by listing
- Namespace root{.levels = {}};
- auto list_result = catalog->ListNamespaces(root);
- ASSERT_THAT(list_result, IsOk());
- EXPECT_EQ(list_result->size(), 1);
- EXPECT_EQ(list_result->at(0).levels, std::vector<std::string>{"test_ns"});
-}
-
-TEST_F(RestCatalogIntegrationTest, CreateNamespaceWithProperties) {
- auto catalog_result = CreateCatalog();
- ASSERT_THAT(catalog_result, IsOk());
- auto& catalog = catalog_result.value();
-
- // Create namespace with properties
- Namespace ns{.levels = {"test_ns_props"}};
- std::unordered_map<std::string, std::string> properties{
- {"owner", "test_user"}, {"description", "Test namespace with
properties"}};
- auto status = catalog->CreateNamespace(ns, properties);
- EXPECT_THAT(status, IsOk());
-
- // Verify properties were set
- auto props_result = catalog->GetNamespaceProperties(ns);
- ASSERT_THAT(props_result, IsOk());
- EXPECT_EQ(props_result->at("owner"), "test_user");
- EXPECT_EQ(props_result->at("description"), "Test namespace with properties");
-}
-
-TEST_F(RestCatalogIntegrationTest, CreateNestedNamespace) {
- auto catalog_result = CreateCatalog();
- ASSERT_THAT(catalog_result, IsOk());
- auto& catalog = catalog_result.value();
-
- // Create parent namespace
- Namespace parent{.levels = {"parent"}};
- auto status = catalog->CreateNamespace(parent, {});
- EXPECT_THAT(status, IsOk());
-
- // Create nested namespace
- Namespace child{.levels = {"parent", "child"}};
- status = catalog->CreateNamespace(child, {});
- EXPECT_THAT(status, IsOk());
-
- // Verify nested namespace exists
- auto list_result = catalog->ListNamespaces(parent);
- ASSERT_THAT(list_result, IsOk());
- EXPECT_EQ(list_result->size(), 1);
- EXPECT_EQ(list_result->at(0).levels, (std::vector<std::string>{"parent",
"child"}));
-}
-
-TEST_F(RestCatalogIntegrationTest, GetNamespaceProperties) {
- auto catalog_result = CreateCatalog();
- ASSERT_THAT(catalog_result, IsOk());
- auto& catalog = catalog_result.value();
-
- // Create namespace with properties
- Namespace ns{.levels = {"test_get_props"}};
- std::unordered_map<std::string, std::string> properties{{"key1", "value1"},
- {"key2", "value2"}};
- auto status = catalog->CreateNamespace(ns, properties);
- EXPECT_THAT(status, IsOk());
-
- // Get properties
- auto props_result = catalog->GetNamespaceProperties(ns);
- ASSERT_THAT(props_result, IsOk());
- EXPECT_EQ(props_result->at("key1"), "value1");
- EXPECT_EQ(props_result->at("key2"), "value2");
-}
-
-TEST_F(RestCatalogIntegrationTest, NamespaceExists) {
- auto catalog_result = CreateCatalog();
- ASSERT_THAT(catalog_result, IsOk());
- auto& catalog = catalog_result.value();
-
- // Check non-existent namespace
- Namespace ns{.levels = {"non_existent"}};
- auto exists_result = catalog->NamespaceExists(ns);
- ASSERT_THAT(exists_result, IsOk());
- EXPECT_FALSE(*exists_result);
-
- // Create namespace
- auto status = catalog->CreateNamespace(ns, {});
- EXPECT_THAT(status, IsOk());
-
- // Check it now exists
- exists_result = catalog->NamespaceExists(ns);
- ASSERT_THAT(exists_result, IsOk());
- EXPECT_TRUE(exists_result.value());
-}
-
-TEST_F(RestCatalogIntegrationTest, UpdateNamespaceProperties) {
- auto catalog_result = CreateCatalog();
- ASSERT_THAT(catalog_result, IsOk());
- auto& catalog = catalog_result.value();
-
- // Create namespace with initial properties
- Namespace ns{.levels = {"test_update"}};
- std::unordered_map<std::string, std::string> initial_props{{"key1",
"value1"},
- {"key2",
"value2"}};
- auto status = catalog->CreateNamespace(ns, initial_props);
- EXPECT_THAT(status, IsOk());
-
- // Update properties: modify key1, add key3, remove key2
- std::unordered_map<std::string, std::string> updates{{"key1",
"updated_value1"},
- {"key3", "value3"}};
- std::unordered_set<std::string> removals{"key2"};
- status = catalog->UpdateNamespaceProperties(ns, updates, removals);
- EXPECT_THAT(status, IsOk());
-
- // Verify updated properties
- auto props_result = catalog->GetNamespaceProperties(ns);
- ASSERT_THAT(props_result, IsOk());
- EXPECT_EQ(props_result->at("key1"), "updated_value1");
- EXPECT_EQ(props_result->at("key3"), "value3");
- EXPECT_EQ(props_result->count("key2"), 0); // Should be removed
-}
-
-TEST_F(RestCatalogIntegrationTest, DropNamespace) {
- auto catalog_result = CreateCatalog();
- ASSERT_THAT(catalog_result, IsOk());
- auto& catalog = catalog_result.value();
-
- // Create namespace
- Namespace ns{.levels = {"test_drop"}};
- auto status = catalog->CreateNamespace(ns, {});
- EXPECT_THAT(status, IsOk());
-
- // Verify it exists
- auto exists_result = catalog->NamespaceExists(ns);
- ASSERT_THAT(exists_result, IsOk());
- EXPECT_TRUE(*exists_result);
-
- // Drop namespace
- status = catalog->DropNamespace(ns);
- EXPECT_THAT(status, IsOk());
-
- // Verify it no longer exists
- exists_result = catalog->NamespaceExists(ns);
- ASSERT_THAT(exists_result, IsOk());
- EXPECT_FALSE(exists_result.value());
-}
-
-TEST_F(RestCatalogIntegrationTest, DropNonExistentNamespace) {
- auto catalog_result = CreateCatalog();
- ASSERT_THAT(catalog_result, IsOk());
- auto& catalog = catalog_result.value();
-
- Namespace ns{.levels = {"nonexistent_namespace"}};
- auto status = catalog->DropNamespace(ns);
-
- // Should return NoSuchNamespace error
- EXPECT_THAT(status, IsError(ErrorKind::kNoSuchNamespace));
-}
-
-TEST_F(RestCatalogIntegrationTest, CreateTable) {
- auto catalog_result = CreateCatalog();
- ASSERT_THAT(catalog_result, IsOk());
- auto& catalog = catalog_result.value();
-
- // Create nested namespace with properties
- Namespace ns{.levels = {"test_create_table", "apple", "ios"}};
- std::unordered_map<std::string, std::string> ns_properties{{"owner", "ray"},
- {"community",
"apache"}};
-
- // Create parent namespaces first
- auto status = catalog->CreateNamespace(Namespace{.levels =
{"test_create_table"}}, {});
- EXPECT_THAT(status, IsOk());
- status =
- catalog->CreateNamespace(Namespace{.levels = {"test_create_table",
"apple"}}, {});
- EXPECT_THAT(status, IsOk());
- status = catalog->CreateNamespace(ns, ns_properties);
- EXPECT_THAT(status, IsOk());
-
- auto schema = CreateDefaultSchema();
- auto partition_spec = PartitionSpec::Unpartitioned();
- auto sort_order = SortOrder::Unsorted();
-
- // Create table
- TableIdentifier table_id{.ns = ns, .name = "t1"};
- std::unordered_map<std::string, std::string> table_properties;
- auto table_result = catalog->CreateTable(table_id, schema, partition_spec,
sort_order,
- "", table_properties);
- ASSERT_THAT(table_result, IsOk());
- auto& table = table_result.value();
-
- // Verify table
- EXPECT_EQ(table->name().ns.levels,
- (std::vector<std::string>{"test_create_table", "apple", "ios"}));
- EXPECT_EQ(table->name().name, "t1");
-
- // Verify that creating the same table again fails
- auto duplicate_result = catalog->CreateTable(table_id, schema,
partition_spec,
- sort_order, "",
table_properties);
- EXPECT_THAT(duplicate_result, IsError(ErrorKind::kAlreadyExists));
- EXPECT_THAT(duplicate_result,
- HasErrorMessage("Table already exists:
test_create_table.apple.ios.t1"));
-}
-
-TEST_F(RestCatalogIntegrationTest, ListTables) {
- auto catalog_result = CreateCatalog();
- ASSERT_THAT(catalog_result, IsOk());
- auto& catalog = catalog_result.value();
-
- // Create namespace
- Namespace ns{.levels = {"test_list_tables"}};
- auto status = catalog->CreateNamespace(ns, {});
- EXPECT_THAT(status, IsOk());
-
- // Initially no tables
- auto list_result = catalog->ListTables(ns);
- ASSERT_THAT(list_result, IsOk());
- EXPECT_TRUE(list_result.value().empty());
-
- // Create tables
- auto schema = CreateDefaultSchema();
- auto partition_spec = PartitionSpec::Unpartitioned();
- auto sort_order = SortOrder::Unsorted();
- std::unordered_map<std::string, std::string> table_properties;
-
- TableIdentifier table1{.ns = ns, .name = "table1"};
- auto create_result = catalog->CreateTable(table1, schema, partition_spec,
sort_order,
- "", table_properties);
- ASSERT_THAT(create_result, IsOk());
-
- TableIdentifier table2{.ns = ns, .name = "table2"};
- create_result = catalog->CreateTable(table2, schema, partition_spec,
sort_order, "",
- table_properties);
- ASSERT_THAT(create_result, IsOk());
-
- // List and varify tables
- list_result = catalog->ListTables(ns);
- ASSERT_THAT(list_result, IsOk());
- EXPECT_THAT(list_result.value(), testing::UnorderedElementsAre(
- testing::Field(&TableIdentifier::name,
"table1"),
- testing::Field(&TableIdentifier::name,
"table2")));
-}
-
-TEST_F(RestCatalogIntegrationTest, LoadTable) {
- auto catalog_result = CreateCatalog();
- ASSERT_THAT(catalog_result, IsOk());
- auto& catalog = catalog_result.value();
-
- // Create namespace and table first
- Namespace ns{.levels = {"test_load_table"}};
- auto status = catalog->CreateNamespace(ns, {});
- EXPECT_THAT(status, IsOk());
-
- // Create schema, partition spec, and sort order using helper functions
- auto schema = CreateDefaultSchema();
- auto partition_spec = PartitionSpec::Unpartitioned();
- auto sort_order = SortOrder::Unsorted();
-
- // Create table
- TableIdentifier table_id{.ns = ns, .name = "test_table"};
- std::unordered_map<std::string, std::string> table_properties{{"key1",
"value1"}};
- auto create_result = catalog->CreateTable(table_id, schema, partition_spec,
sort_order,
- "", table_properties);
- ASSERT_THAT(create_result, IsOk());
-
- // Load the table
- auto load_result = catalog->LoadTable(table_id);
- ASSERT_THAT(load_result, IsOk());
- auto& loaded_table = load_result.value();
-
- // Verify loaded table properties
- EXPECT_EQ(loaded_table->name().ns.levels,
std::vector<std::string>{"test_load_table"});
- EXPECT_EQ(loaded_table->name().name, "test_table");
- EXPECT_NE(loaded_table->metadata(), nullptr);
-
- // Verify schema
- auto loaded_schema_result = loaded_table->schema();
- ASSERT_THAT(loaded_schema_result, IsOk());
- auto loaded_schema = loaded_schema_result.value();
- EXPECT_EQ(loaded_schema->fields().size(), 2);
- EXPECT_EQ(loaded_schema->fields()[0].name(), "id");
- EXPECT_EQ(loaded_schema->fields()[1].name(), "data");
-}
-
-TEST_F(RestCatalogIntegrationTest, DropTable) {
- auto catalog_result = CreateCatalog();
- ASSERT_THAT(catalog_result, IsOk());
- auto& catalog = catalog_result.value();
-
- // Create namespace and table first
- Namespace ns{.levels = {"test_drop_table"}};
- auto status = catalog->CreateNamespace(ns, {});
- EXPECT_THAT(status, IsOk());
-
- // Create table
- auto schema = CreateDefaultSchema();
- auto partition_spec = PartitionSpec::Unpartitioned();
- auto sort_order = SortOrder::Unsorted();
-
- TableIdentifier table_id{.ns = ns, .name = "table_to_drop"};
- std::unordered_map<std::string, std::string> table_properties;
- auto create_result = catalog->CreateTable(table_id, schema, partition_spec,
sort_order,
- "", table_properties);
- ASSERT_THAT(create_result, IsOk());
-
- // Verify table exists
- auto load_result = catalog->TableExists(table_id);
- ASSERT_THAT(load_result, IsOk());
- EXPECT_TRUE(load_result.value());
-
- // Drop the table
- status = catalog->DropTable(table_id, /*purge=*/false);
- ASSERT_THAT(status, IsOk());
-
- // Verify table no longer exists
- load_result = catalog->TableExists(table_id);
- ASSERT_THAT(load_result, IsOk());
- EXPECT_FALSE(load_result.value());
-}
-
-TEST_F(RestCatalogIntegrationTest, RenameTable) {
- auto catalog_result = CreateCatalog();
- ASSERT_THAT(catalog_result, IsOk());
- auto& catalog = catalog_result.value();
-
- // Create namespace
- Namespace ns{.levels = {"test_rename_table"}};
- auto status = catalog->CreateNamespace(ns, {});
- EXPECT_THAT(status, IsOk());
-
- // Create table
- auto schema = CreateDefaultSchema();
- auto partition_spec = PartitionSpec::Unpartitioned();
- auto sort_order = SortOrder::Unsorted();
-
- TableIdentifier old_table_id{.ns = ns, .name = "old_table"};
- std::unordered_map<std::string, std::string> table_properties;
- auto table_result = catalog->CreateTable(old_table_id, schema,
partition_spec,
- sort_order, "", table_properties);
- ASSERT_THAT(table_result, IsOk());
-
- // Rename table
- TableIdentifier new_table_id{.ns = ns, .name = "new_table"};
- status = catalog->RenameTable(old_table_id, new_table_id);
- ASSERT_THAT(status, IsOk());
-
- // Verify old table no longer exists
- auto exists_result = catalog->TableExists(old_table_id);
- ASSERT_THAT(exists_result, IsOk());
- EXPECT_FALSE(exists_result.value());
-
- // Verify new table exists
- exists_result = catalog->TableExists(new_table_id);
- ASSERT_THAT(exists_result, IsOk());
- EXPECT_TRUE(exists_result.value());
-}
-
-TEST_F(RestCatalogIntegrationTest, UpdateTable) {
- auto catalog_result = CreateCatalog();
- ASSERT_THAT(catalog_result, IsOk());
- auto& catalog = catalog_result.value();
-
- // Create namespace
- Namespace ns{.levels = {"test_update_table"}};
- auto status = catalog->CreateNamespace(ns, {});
- EXPECT_THAT(status, IsOk());
-
- // Create table
- auto schema = CreateDefaultSchema();
- auto partition_spec = PartitionSpec::Unpartitioned();
- auto sort_order = SortOrder::Unsorted();
-
- TableIdentifier table_id{.ns = ns, .name = "t1"};
- std::unordered_map<std::string, std::string> table_properties;
- auto table_result = catalog->CreateTable(table_id, schema, partition_spec,
sort_order,
- "", table_properties);
- ASSERT_THAT(table_result, IsOk());
- auto& table = table_result.value();
-
- // Update table properties
- std::vector<std::unique_ptr<TableRequirement>> requirements;
- requirements.push_back(std::make_unique<table::AssertUUID>(table->uuid()));
-
- std::vector<std::unique_ptr<TableUpdate>> updates;
- updates.push_back(std::make_unique<table::SetProperties>(
- std::unordered_map<std::string, std::string>{{"key1", "value1"}}));
-
- auto update_result = catalog->UpdateTable(table_id, requirements, updates);
- ASSERT_THAT(update_result, IsOk());
- auto& updated_table = update_result.value();
-
- // Verify the property was set
- auto& props = updated_table->metadata()->properties.configs();
- EXPECT_EQ(props.at("key1"), "value1");
-}
-
-TEST_F(RestCatalogIntegrationTest, RegisterTable) {
- auto catalog_result = CreateCatalog();
- ASSERT_THAT(catalog_result, IsOk());
- auto& catalog = catalog_result.value();
-
- // Create namespace
- Namespace ns{.levels = {"test_register_table"}};
- auto status = catalog->CreateNamespace(ns, {});
- EXPECT_THAT(status, IsOk());
-
- // Create table
- auto schema = CreateDefaultSchema();
- auto partition_spec = PartitionSpec::Unpartitioned();
- auto sort_order = SortOrder::Unsorted();
-
- TableIdentifier table_id{.ns = ns, .name = "t1"};
- std::unordered_map<std::string, std::string> table_properties;
- auto table_result = catalog->CreateTable(table_id, schema, partition_spec,
sort_order,
- "", table_properties);
- ASSERT_THAT(table_result, IsOk());
- auto& table = table_result.value();
- std::string metadata_location(table->metadata_file_location());
-
- // Drop table (without purge, to keep metadata file)
- status = catalog->DropTable(table_id, /*purge=*/false);
- ASSERT_THAT(status, IsOk());
-
- // Register table with new name
- TableIdentifier new_table_id{.ns = ns, .name = "t2"};
- auto register_result = catalog->RegisterTable(new_table_id,
metadata_location);
- ASSERT_THAT(register_result, IsOk());
- auto& registered_table = register_result.value();
-
- EXPECT_EQ(table->metadata_file_location(),
registered_table->metadata_file_location());
- EXPECT_NE(table->name(), registered_table->name());
-}
-
-TEST_F(RestCatalogIntegrationTest, StageCreateTable) {
- auto catalog_result = CreateCatalog();
- ASSERT_THAT(catalog_result, IsOk());
- auto& catalog = catalog_result.value();
-
- // Create namespace
- Namespace ns{.levels = {"test_stage_create"}};
- auto status = catalog->CreateNamespace(ns, {});
- EXPECT_THAT(status, IsOk());
-
- // Stage create table
- auto schema = CreateDefaultSchema();
- auto partition_spec = PartitionSpec::Unpartitioned();
- auto sort_order = SortOrder::Unsorted();
-
- TableIdentifier table_id{.ns = ns, .name = "staged_table"};
- std::unordered_map<std::string, std::string> table_properties{{"key1",
"value1"}};
- auto txn_result = catalog->StageCreateTable(table_id, schema, partition_spec,
- sort_order, "",
table_properties);
- ASSERT_THAT(txn_result, IsOk());
- auto& txn = txn_result.value();
-
- // Verify the staged table in transaction
- EXPECT_NE(txn->table(), nullptr);
- EXPECT_EQ(txn->table()->name(), table_id);
-
- // Table should NOT exist in catalog yet (staged but not committed)
- auto exists_result = catalog->TableExists(table_id);
- ASSERT_THAT(exists_result, IsOk());
- EXPECT_FALSE(exists_result.value());
-
- // Commit the transaction
- auto commit_result = txn->Commit();
- ASSERT_THAT(commit_result, IsOk());
- auto& committed_table = commit_result.value();
-
- // Verify table now exists
- exists_result = catalog->TableExists(table_id);
- ASSERT_THAT(exists_result, IsOk());
- EXPECT_TRUE(exists_result.value());
-
- // Verify table properties
- EXPECT_EQ(committed_table->name(), table_id);
- auto& props = committed_table->metadata()->properties.configs();
- EXPECT_EQ(props.at("key1"), "value1");
-}
-
-} // namespace iceberg::rest
diff --git a/src/iceberg/test/rest_util_test.cc
b/src/iceberg/test/rest_util_test.cc
index e11f0015..c4b6aa85 100644
--- a/src/iceberg/test/rest_util_test.cc
+++ b/src/iceberg/test/rest_util_test.cc
@@ -19,8 +19,11 @@
#include "iceberg/catalog/rest/rest_util.h"
+#include <string>
+
#include <gtest/gtest.h>
+#include "iceberg/catalog/rest/catalog_properties.h"
#include "iceberg/catalog/rest/endpoint.h"
#include "iceberg/table_identifier.h"
#include "iceberg/test/matchers.h"
@@ -154,4 +157,50 @@ TEST(RestUtilTest, MergeConfigs) {
EXPECT_EQ(merged_empty["key"], "value");
}
+struct SnapshotModeValidCase {
+ std::string input;
+ SnapshotMode expected;
+};
+
+class SnapshotLoadingModeValidTest
+ : public ::testing::TestWithParam<SnapshotModeValidCase> {};
+
+TEST_P(SnapshotLoadingModeValidTest, ParsesCorrectly) {
+ auto config = RestCatalogProperties::default_properties();
+ config.Set(RestCatalogProperties::kSnapshotLoadingMode, GetParam().input);
+ auto result = config.SnapshotLoadingMode();
+ ASSERT_THAT(result, IsOk());
+ EXPECT_EQ(result.value(), GetParam().expected);
+}
+
+INSTANTIATE_TEST_SUITE_P(RestCatalogProperties, SnapshotLoadingModeValidTest,
+ ::testing::Values(
+ // Exact uppercase
+ SnapshotModeValidCase{"ALL", SnapshotMode::kAll},
+ SnapshotModeValidCase{"REFS",
SnapshotMode::kRefs},
+ // Lowercase (Java parity: toUpperCase before
parsing)
+ SnapshotModeValidCase{"all", SnapshotMode::kAll},
+ SnapshotModeValidCase{"refs",
SnapshotMode::kRefs},
+ // Mixed case
+ SnapshotModeValidCase{"All", SnapshotMode::kAll},
+ SnapshotModeValidCase{"Refs",
SnapshotMode::kRefs}));
+
+class SnapshotLoadingModeInvalidTest : public
::testing::TestWithParam<std::string> {};
+
+TEST_P(SnapshotLoadingModeInvalidTest, ReturnsError) {
+ auto config = RestCatalogProperties::default_properties();
+ config.Set(RestCatalogProperties::kSnapshotLoadingMode, GetParam());
+ EXPECT_THAT(config.SnapshotLoadingMode(),
IsError(ErrorKind::kInvalidArgument));
+}
+
+INSTANTIATE_TEST_SUITE_P(RestCatalogProperties, SnapshotLoadingModeInvalidTest,
+ ::testing::Values("INVALID", "none", ""));
+
+TEST(RestCatalogPropertiesTest, SnapshotLoadingModeDefaultIsAll) {
+ auto config = RestCatalogProperties::default_properties();
+ auto result = config.SnapshotLoadingMode();
+ ASSERT_THAT(result, IsOk());
+ EXPECT_EQ(result.value(), SnapshotMode::kAll);
+}
+
} // namespace iceberg::rest