This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 0279fdc feat: RegisterTable support for InMemoryCatalog (#142)
0279fdc is described below
commit 0279fdc51325f840baed0c4f838ddf572ab46da0
Author: lishuxu <[email protected]>
AuthorDate: Mon Jul 21 12:02:39 2025 +0800
feat: RegisterTable support for InMemoryCatalog (#142)
Note: Since the LoadTable interface needs to return a Table object that
holds a shared_from_this pointer to the catalog, I remove
InMemoryCatalog inheritance from Catalog and instead directly implement
the interface in InMemoryCatalog.
---------
Co-authored-by: shuxu.li <[email protected]>
---
src/iceberg/catalog.h | 3 +-
src/iceberg/catalog/in_memory_catalog.cc | 227 ++++++-------------------------
src/iceberg/catalog/in_memory_catalog.h | 17 ++-
test/CMakeLists.txt | 12 +-
test/in_memory_catalog_test.cc | 63 ++++++++-
5 files changed, 126 insertions(+), 196 deletions(-)
diff --git a/src/iceberg/catalog.h b/src/iceberg/catalog.h
index a882f4d..1f33735 100644
--- a/src/iceberg/catalog.h
+++ b/src/iceberg/catalog.h
@@ -166,8 +166,7 @@ class ICEBERG_EXPORT Catalog {
/// \param identifier a table identifier
/// \return instance of Table implementation referred to by identifier or
/// ErrorKind::kNoSuchTable if the table does not exist
- virtual Result<std::shared_ptr<Table>> LoadTable(
- const TableIdentifier& identifier) const = 0;
+ virtual Result<std::unique_ptr<Table>> LoadTable(const TableIdentifier&
identifier) = 0;
/// \brief Register a table with the catalog if it does not exist
///
diff --git a/src/iceberg/catalog/in_memory_catalog.cc
b/src/iceberg/catalog/in_memory_catalog.cc
index 67e2b0c..af6aa14 100644
--- a/src/iceberg/catalog/in_memory_catalog.cc
+++ b/src/iceberg/catalog/in_memory_catalog.cc
@@ -21,17 +21,14 @@
#include <algorithm>
#include <iterator> // IWYU pragma: keep
-#include <mutex>
-#include <unordered_map>
#include "iceberg/exception.h"
#include "iceberg/table.h"
+#include "iceberg/table_metadata.h"
#include "iceberg/util/macros.h"
namespace iceberg {
-namespace {
-
/// \brief A hierarchical namespace that manages namespaces and table metadata
in-memory.
///
/// Each InMemoryNamespace represents a namespace level and can contain
properties,
@@ -317,117 +314,56 @@ Result<std::string>
InMemoryNamespace::GetTableMetadataLocation(
return it->second;
}
-} // namespace
-
-class ICEBERG_EXPORT InMemoryCatalogImpl {
- public:
- InMemoryCatalogImpl(std::string name, std::shared_ptr<FileIO> file_io,
- std::string warehouse_location,
- std::unordered_map<std::string, std::string> properties);
-
- std::string_view name() const;
-
- Status CreateNamespace(const Namespace& ns,
- const std::unordered_map<std::string, std::string>&
properties);
-
- Result<std::vector<Namespace>> ListNamespaces(const Namespace& ns) const;
-
- Status DropNamespace(const Namespace& ns);
-
- Result<bool> NamespaceExists(const Namespace& ns) const;
-
- Result<std::unordered_map<std::string, std::string>> GetNamespaceProperties(
- const Namespace& ns) const;
-
- Status UpdateNamespaceProperties(
- const Namespace& ns, const std::unordered_map<std::string, std::string>&
updates,
- const std::unordered_set<std::string>& removals);
-
- Result<std::vector<TableIdentifier>> ListTables(const Namespace& ns) const;
-
- Result<std::unique_ptr<Table>> CreateTable(
- const TableIdentifier& identifier, const Schema& schema, const
PartitionSpec& spec,
- const std::string& location,
- const std::unordered_map<std::string, std::string>& properties);
-
- Result<std::unique_ptr<Table>> UpdateTable(
- const TableIdentifier& identifier,
- const std::vector<std::unique_ptr<UpdateRequirement>>& requirements,
- const std::vector<std::unique_ptr<MetadataUpdate>>& updates);
-
- Result<std::shared_ptr<Transaction>> StageCreateTable(
- const TableIdentifier& identifier, const Schema& schema, const
PartitionSpec& spec,
- const std::string& location,
- const std::unordered_map<std::string, std::string>& properties);
-
- Result<bool> TableExists(const TableIdentifier& identifier) const;
-
- Status DropTable(const TableIdentifier& identifier, bool purge);
-
- Result<std::shared_ptr<Table>> LoadTable(const TableIdentifier& identifier)
const;
-
- Result<std::shared_ptr<Table>> RegisterTable(const TableIdentifier&
identifier,
- const std::string&
metadata_file_location);
-
- std::unique_ptr<TableBuilder> BuildTable(const TableIdentifier& identifier,
- const Schema& schema) const;
-
- private:
- std::string catalog_name_;
- std::unordered_map<std::string, std::string> properties_;
- std::shared_ptr<FileIO> file_io_;
- std::string warehouse_location_;
- std::unique_ptr<class InMemoryNamespace> root_namespace_;
- mutable std::recursive_mutex mutex_;
-};
-
-InMemoryCatalogImpl::InMemoryCatalogImpl(
- std::string name, std::shared_ptr<FileIO> file_io, std::string
warehouse_location,
- std::unordered_map<std::string, std::string> properties)
+InMemoryCatalog::InMemoryCatalog(
+ std::string const& name, std::shared_ptr<FileIO> const& file_io,
+ std::string const& warehouse_location,
+ std::unordered_map<std::string, std::string> const& properties)
: catalog_name_(std::move(name)),
properties_(std::move(properties)),
file_io_(std::move(file_io)),
warehouse_location_(std::move(warehouse_location)),
root_namespace_(std::make_unique<InMemoryNamespace>()) {}
-std::string_view InMemoryCatalogImpl::name() const { return catalog_name_; }
+InMemoryCatalog::~InMemoryCatalog() = default;
+
+std::string_view InMemoryCatalog::name() const { return catalog_name_; }
-Status InMemoryCatalogImpl::CreateNamespace(
+Status InMemoryCatalog::CreateNamespace(
const Namespace& ns, const std::unordered_map<std::string, std::string>&
properties) {
std::unique_lock lock(mutex_);
return root_namespace_->CreateNamespace(ns, properties);
}
-Result<std::vector<Namespace>> InMemoryCatalogImpl::ListNamespaces(
+Result<std::unordered_map<std::string, std::string>>
+InMemoryCatalog::GetNamespaceProperties(const Namespace& ns) const {
+ std::unique_lock lock(mutex_);
+ return root_namespace_->GetProperties(ns);
+}
+
+Result<std::vector<Namespace>> InMemoryCatalog::ListNamespaces(
const Namespace& ns) const {
std::unique_lock lock(mutex_);
return root_namespace_->ListNamespaces(ns);
}
-Status InMemoryCatalogImpl::DropNamespace(const Namespace& ns) {
+Status InMemoryCatalog::DropNamespace(const Namespace& ns) {
std::unique_lock lock(mutex_);
return root_namespace_->DropNamespace(ns);
}
-Result<bool> InMemoryCatalogImpl::NamespaceExists(const Namespace& ns) const {
+Result<bool> InMemoryCatalog::NamespaceExists(const Namespace& ns) const {
std::unique_lock lock(mutex_);
return root_namespace_->NamespaceExists(ns);
}
-Result<std::unordered_map<std::string, std::string>>
-InMemoryCatalogImpl::GetNamespaceProperties(const Namespace& ns) const {
- std::unique_lock lock(mutex_);
- return root_namespace_->GetProperties(ns);
-}
-
-Status InMemoryCatalogImpl::UpdateNamespaceProperties(
+Status InMemoryCatalog::UpdateNamespaceProperties(
const Namespace& ns, const std::unordered_map<std::string, std::string>&
updates,
const std::unordered_set<std::string>& removals) {
std::unique_lock lock(mutex_);
return root_namespace_->UpdateNamespaceProperties(ns, updates, removals);
}
-Result<std::vector<TableIdentifier>> InMemoryCatalogImpl::ListTables(
+Result<std::vector<TableIdentifier>> InMemoryCatalog::ListTables(
const Namespace& ns) const {
std::unique_lock lock(mutex_);
const auto& table_names = root_namespace_->ListTables(ns);
@@ -440,44 +376,60 @@ Result<std::vector<TableIdentifier>>
InMemoryCatalogImpl::ListTables(
return table_idents;
}
-Result<std::unique_ptr<Table>> InMemoryCatalogImpl::CreateTable(
+Result<std::unique_ptr<Table>> InMemoryCatalog::CreateTable(
const TableIdentifier& identifier, const Schema& schema, const
PartitionSpec& spec,
const std::string& location,
const std::unordered_map<std::string, std::string>& properties) {
return NotImplemented("create table");
}
-Result<std::unique_ptr<Table>> InMemoryCatalogImpl::UpdateTable(
+Result<std::unique_ptr<Table>> InMemoryCatalog::UpdateTable(
const TableIdentifier& identifier,
const std::vector<std::unique_ptr<UpdateRequirement>>& requirements,
const std::vector<std::unique_ptr<MetadataUpdate>>& updates) {
return NotImplemented("update table");
}
-Result<std::shared_ptr<Transaction>> InMemoryCatalogImpl::StageCreateTable(
+Result<std::shared_ptr<Transaction>> InMemoryCatalog::StageCreateTable(
const TableIdentifier& identifier, const Schema& schema, const
PartitionSpec& spec,
const std::string& location,
const std::unordered_map<std::string, std::string>& properties) {
return NotImplemented("stage create table");
}
-Result<bool> InMemoryCatalogImpl::TableExists(const TableIdentifier&
identifier) const {
+Result<bool> InMemoryCatalog::TableExists(const TableIdentifier& identifier)
const {
std::unique_lock lock(mutex_);
return root_namespace_->TableExists(identifier);
}
-Status InMemoryCatalogImpl::DropTable(const TableIdentifier& identifier, bool
purge) {
+Status InMemoryCatalog::DropTable(const TableIdentifier& identifier, bool
purge) {
std::unique_lock lock(mutex_);
// TODO(Guotao): Delete all metadata files if purge is true.
return root_namespace_->UnregisterTable(identifier);
}
-Result<std::shared_ptr<Table>> InMemoryCatalogImpl::LoadTable(
- const TableIdentifier& identifier) const {
- return NotImplemented("load table");
+Result<std::unique_ptr<Table>> InMemoryCatalog::LoadTable(
+ const TableIdentifier& identifier) {
+ if (!file_io_) [[unlikely]] {
+ return InvalidArgument("file_io is not set for catalog {}", catalog_name_);
+ }
+
+ Result<std::string> metadata_location;
+ {
+ std::unique_lock lock(mutex_);
+ ICEBERG_ASSIGN_OR_RAISE(metadata_location,
+
root_namespace_->GetTableMetadataLocation(identifier));
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(auto metadata,
+ TableMetadataUtil::Read(*file_io_,
metadata_location.value()));
+
+ return std::make_unique<Table>(identifier, std::move(metadata),
+ metadata_location.value(), file_io_,
+
std::static_pointer_cast<Catalog>(shared_from_this()));
}
-Result<std::shared_ptr<Table>> InMemoryCatalogImpl::RegisterTable(
+Result<std::shared_ptr<Table>> InMemoryCatalog::RegisterTable(
const TableIdentifier& identifier, const std::string&
metadata_file_location) {
std::unique_lock lock(mutex_);
if (!root_namespace_->NamespaceExists(identifier.ns)) {
@@ -489,95 +441,6 @@ Result<std::shared_ptr<Table>>
InMemoryCatalogImpl::RegisterTable(
return LoadTable(identifier);
}
-std::unique_ptr<TableBuilder> InMemoryCatalogImpl::BuildTable(
- const TableIdentifier& identifier, const Schema& schema) const {
- throw IcebergError("not implemented");
-}
-
-InMemoryCatalog::InMemoryCatalog(
- std::string const& name, std::shared_ptr<FileIO> const& file_io,
- std::string const& warehouse_location,
- std::unordered_map<std::string, std::string> const& properties)
- : impl_(std::make_unique<InMemoryCatalogImpl>(name, file_io,
warehouse_location,
- properties)) {}
-
-InMemoryCatalog::~InMemoryCatalog() = default;
-
-std::string_view InMemoryCatalog::name() const { return impl_->name(); }
-
-Status InMemoryCatalog::CreateNamespace(
- const Namespace& ns, const std::unordered_map<std::string, std::string>&
properties) {
- return impl_->CreateNamespace(ns, properties);
-}
-
-Result<std::unordered_map<std::string, std::string>>
-InMemoryCatalog::GetNamespaceProperties(const Namespace& ns) const {
- return impl_->GetNamespaceProperties(ns);
-}
-
-Result<std::vector<Namespace>> InMemoryCatalog::ListNamespaces(
- const Namespace& ns) const {
- return impl_->ListNamespaces(ns);
-}
-
-Status InMemoryCatalog::DropNamespace(const Namespace& ns) {
- return impl_->DropNamespace(ns);
-}
-
-Result<bool> InMemoryCatalog::NamespaceExists(const Namespace& ns) const {
- return impl_->NamespaceExists(ns);
-}
-
-Status InMemoryCatalog::UpdateNamespaceProperties(
- const Namespace& ns, const std::unordered_map<std::string, std::string>&
updates,
- const std::unordered_set<std::string>& removals) {
- return impl_->UpdateNamespaceProperties(ns, updates, removals);
-}
-
-Result<std::vector<TableIdentifier>> InMemoryCatalog::ListTables(
- const Namespace& ns) const {
- return impl_->ListTables(ns);
-}
-
-Result<std::unique_ptr<Table>> InMemoryCatalog::CreateTable(
- const TableIdentifier& identifier, const Schema& schema, const
PartitionSpec& spec,
- const std::string& location,
- const std::unordered_map<std::string, std::string>& properties) {
- return impl_->CreateTable(identifier, schema, spec, location, properties);
-}
-
-Result<std::unique_ptr<Table>> InMemoryCatalog::UpdateTable(
- const TableIdentifier& identifier,
- const std::vector<std::unique_ptr<UpdateRequirement>>& requirements,
- const std::vector<std::unique_ptr<MetadataUpdate>>& updates) {
- return impl_->UpdateTable(identifier, requirements, updates);
-}
-
-Result<std::shared_ptr<Transaction>> InMemoryCatalog::StageCreateTable(
- const TableIdentifier& identifier, const Schema& schema, const
PartitionSpec& spec,
- const std::string& location,
- const std::unordered_map<std::string, std::string>& properties) {
- return impl_->StageCreateTable(identifier, schema, spec, location,
properties);
-}
-
-Result<bool> InMemoryCatalog::TableExists(const TableIdentifier& identifier)
const {
- return impl_->TableExists(identifier);
-}
-
-Status InMemoryCatalog::DropTable(const TableIdentifier& identifier, bool
purge) {
- return impl_->DropTable(identifier, purge);
-}
-
-Result<std::shared_ptr<Table>> InMemoryCatalog::LoadTable(
- const TableIdentifier& identifier) const {
- return impl_->LoadTable(identifier);
-}
-
-Result<std::shared_ptr<Table>> InMemoryCatalog::RegisterTable(
- const TableIdentifier& identifier, const std::string&
metadata_file_location) {
- return impl_->RegisterTable(identifier, metadata_file_location);
-}
-
std::unique_ptr<TableBuilder> InMemoryCatalog::BuildTable(
const TableIdentifier& identifier, const Schema& schema) const {
throw IcebergError("not implemented");
diff --git a/src/iceberg/catalog/in_memory_catalog.h
b/src/iceberg/catalog/in_memory_catalog.h
index c8e24b5..e3da403 100644
--- a/src/iceberg/catalog/in_memory_catalog.h
+++ b/src/iceberg/catalog/in_memory_catalog.h
@@ -19,9 +19,12 @@
#pragma once
+#include <mutex>
+
#include "iceberg/catalog.h"
namespace iceberg {
+
/**
* @brief An in-memory implementation of the Iceberg Catalog interface.
*
@@ -32,7 +35,9 @@ namespace iceberg {
* @note This class is **not** suitable for production use.
* All data will be lost when the process exits.
*/
-class ICEBERG_EXPORT InMemoryCatalog : public Catalog {
+class ICEBERG_EXPORT InMemoryCatalog
+ : public Catalog,
+ public std::enable_shared_from_this<InMemoryCatalog> {
public:
InMemoryCatalog(std::string const& name, std::shared_ptr<FileIO> const&
file_io,
std::string const& warehouse_location,
@@ -79,8 +84,7 @@ class ICEBERG_EXPORT InMemoryCatalog : public Catalog {
Status DropTable(const TableIdentifier& identifier, bool purge) override;
- Result<std::shared_ptr<Table>> LoadTable(
- const TableIdentifier& identifier) const override;
+ Result<std::unique_ptr<Table>> LoadTable(const TableIdentifier& identifier)
override;
Result<std::shared_ptr<Table>> RegisterTable(
const TableIdentifier& identifier,
@@ -90,7 +94,12 @@ class ICEBERG_EXPORT InMemoryCatalog : public Catalog {
const Schema& schema)
const override;
private:
- std::unique_ptr<class InMemoryCatalogImpl> impl_;
+ std::string catalog_name_;
+ std::unordered_map<std::string, std::string> properties_;
+ std::shared_ptr<FileIO> file_io_;
+ std::string warehouse_location_;
+ std::unique_ptr<class InMemoryNamespace> root_namespace_;
+ mutable std::recursive_mutex mutex_;
};
} // namespace iceberg
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index 8d73961..0597d76 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -44,11 +44,6 @@ target_sources(schema_test
target_link_libraries(schema_test PRIVATE iceberg_static GTest::gtest_main
GTest::gmock)
add_test(NAME schema_test COMMAND schema_test)
-add_executable(catalog_test)
-target_sources(catalog_test PRIVATE in_memory_catalog_test.cc)
-target_link_libraries(catalog_test PRIVATE iceberg_static GTest::gtest_main
GTest::gmock)
-add_test(NAME catalog_test COMMAND catalog_test)
-
add_executable(table_test)
target_include_directories(table_test PRIVATE "${CMAKE_BINARY_DIR}")
target_sources(table_test PRIVATE test_common.cc json_internal_test.cc
table_test.cc
@@ -95,4 +90,11 @@ if(ICEBERG_BUILD_BUNDLE)
target_link_libraries(arrow_test PRIVATE iceberg_bundle_static
GTest::gtest_main
GTest::gmock)
add_test(NAME arrow_test COMMAND arrow_test)
+
+ add_executable(catalog_test)
+ target_include_directories(catalog_test PRIVATE "${CMAKE_BINARY_DIR}")
+ target_sources(catalog_test PRIVATE test_common.cc in_memory_catalog_test.cc)
+ target_link_libraries(catalog_test PRIVATE iceberg_bundle_static
GTest::gtest_main
+ GTest::gmock)
+ add_test(NAME catalog_test COMMAND catalog_test)
endif()
diff --git a/test/in_memory_catalog_test.cc b/test/in_memory_catalog_test.cc
index c76d788..1fe598a 100644
--- a/test/in_memory_catalog_test.cc
+++ b/test/in_memory_catalog_test.cc
@@ -19,24 +19,64 @@
#include "iceberg/catalog/in_memory_catalog.h"
+#include <filesystem>
+
+#include <arrow/filesystem/localfs.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
+#include "iceberg/arrow/arrow_fs_file_io.h"
+#include "iceberg/table.h"
+#include "iceberg/table_metadata.h"
#include "matchers.h"
+#include "test_common.h"
namespace iceberg {
class InMemoryCatalogTest : public ::testing::Test {
protected:
void SetUp() override {
- file_io_ = nullptr; // TODO(Guotao): A real FileIO instance needs to be
constructed.
+ file_io_ = std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(
+ std::make_shared<::arrow::fs::LocalFileSystem>());
std::unordered_map<std::string, std::string> properties = {{"prop1",
"val1"}};
- catalog_ = std::make_unique<InMemoryCatalog>("test_catalog", file_io_,
+ catalog_ = std::make_shared<InMemoryCatalog>("test_catalog", file_io_,
"/tmp/warehouse/",
properties);
}
+ void TearDown() override {
+ // Clean up the temporary files created for the table metadata
+ for (const auto& path : created_temp_paths_) {
+ std::error_code ec;
+ if (std::filesystem::is_directory(path, ec)) {
+ std::filesystem::remove_all(path, ec);
+ } else {
+ std::filesystem::remove(path, ec);
+ }
+ }
+ }
+
+ std::string GenerateTestTableLocation(std::string table_name) {
+ std::filesystem::path temp_dir = std::filesystem::temp_directory_path();
+ const auto info = ::testing::UnitTest::GetInstance()->current_test_info();
+ auto table_location = std::format("{}/iceberg_test_{}_{}/{}/",
temp_dir.string(),
+ info->test_suite_name(), info->name(),
table_name);
+ // generate a unique directory for the table
+ std::error_code ec;
+ std::filesystem::create_directories(table_location, ec);
+ if (ec) {
+ throw std::runtime_error(
+ std::format("Failed to create temporary directory: {}, error
message: {}",
+ table_location, ec.message()));
+ }
+
+ created_temp_paths_.push_back(table_location);
+ return table_location;
+ }
+
std::shared_ptr<FileIO> file_io_;
- std::unique_ptr<InMemoryCatalog> catalog_;
+ std::shared_ptr<InMemoryCatalog> catalog_;
+ // Used to store temporary paths created during the test
+ std::vector<std::string> created_temp_paths_;
};
TEST_F(InMemoryCatalogTest, CatalogName) {
@@ -58,6 +98,23 @@ TEST_F(InMemoryCatalogTest, TableExists) {
EXPECT_THAT(result, HasValue(::testing::Eq(false)));
}
+TEST_F(InMemoryCatalogTest, RegisterTable) {
+ TableIdentifier tableIdent{.ns = {}, .name = "t1"};
+
+ std::unique_ptr<TableMetadata> metadata;
+ ASSERT_NO_FATAL_FAILURE(ReadTableMetadata("TableMetadataV2Valid.json",
&metadata));
+
+ auto table_location = GenerateTestTableLocation(tableIdent.name);
+ auto metadata_location = std::format("{}v1.metadata.json", table_location);
+ auto status = TableMetadataUtil::Write(*file_io_, metadata_location,
*metadata);
+ EXPECT_THAT(status, IsOk());
+
+ auto table = catalog_->RegisterTable(tableIdent, metadata_location);
+ EXPECT_THAT(table, IsOk());
+ ASSERT_EQ(table.value()->name().name, "t1");
+ ASSERT_EQ(table.value()->location(), "s3://bucket/test/location");
+}
+
TEST_F(InMemoryCatalogTest, DropTable) {
TableIdentifier tableIdent{.ns = {}, .name = "t1"};
auto result = catalog_->DropTable(tableIdent, false);