This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new cb4998c feat: lazy init for Schema and StructType (#227)
cb4998c is described below
commit cb4998c8fd0c3ef28dfcbe6107c248a21b7782c5
Author: chao liu <[email protected]>
AuthorDate: Mon Sep 22 15:49:44 2025 +0800
feat: lazy init for Schema and StructType (#227)
…Type
- Added move and copy constructors and assignment operators for Schema
and StructType to manage resource ownership and improve performance.
- Refactored field lookup methods to utilize lazy initialization with
thread safety, ensuring safe concurrent access.
- Introduced unit tests for thread safety in Schema and StructType,
validating concurrent operations and access patterns.
---------
Co-authored-by: nullccxsy <[email protected]>
---
src/iceberg/manifest_list.h | 4 +-
src/iceberg/schema.cc | 9 +++--
src/iceberg/schema.h | 7 +++-
src/iceberg/type.cc | 9 +++--
src/iceberg/type.h | 15 +++++++-
test/avro_data_test.cc | 34 ++++++++---------
test/schema_test.cc | 69 +++++++++++++++++++++++++++++++++
test/type_test.cc | 93 +++++++++++++++++++++++++++++++++++++++++++++
8 files changed, 212 insertions(+), 28 deletions(-)
diff --git a/src/iceberg/manifest_list.h b/src/iceberg/manifest_list.h
index 66433da..d6e7f1d 100644
--- a/src/iceberg/manifest_list.h
+++ b/src/iceberg/manifest_list.h
@@ -185,7 +185,9 @@ struct ICEBERG_EXPORT ManifestFile {
507, "partitions",
std::make_shared<ListType>(SchemaField::MakeRequired(
508, std::string(ListType::kElementName),
- std::make_shared<StructType>(PartitionFieldSummary::Type()))),
+ struct_(
+ {PartitionFieldSummary::kContainsNull,
PartitionFieldSummary::kContainsNaN,
+ PartitionFieldSummary::kLowerBound,
PartitionFieldSummary::kUpperBound}))),
"Summary for each partition");
inline static const SchemaField kKeyMetadata = SchemaField::MakeOptional(
519, "key_metadata", iceberg::binary(), "Encryption key metadata blob");
diff --git a/src/iceberg/schema.cc b/src/iceberg/schema.cc
index 0b67dfb..1df20c6 100644
--- a/src/iceberg/schema.cc
+++ b/src/iceberg/schema.cc
@@ -89,12 +89,14 @@ bool Schema::Equals(const Schema& other) const {
Result<std::optional<std::reference_wrapper<const SchemaField>>>
Schema::FindFieldByName(
std::string_view name, bool case_sensitive) const {
if (case_sensitive) {
- ICEBERG_RETURN_UNEXPECTED(InitNameToIdMap());
+ ICEBERG_RETURN_UNEXPECTED(
+ LazyInitWithCallOnce(name_to_id_flag_, [this]() { return
InitNameToIdMap(); }));
auto it = name_to_id_.find(name);
if (it == name_to_id_.end()) return std::nullopt;
return FindFieldById(it->second);
}
- ICEBERG_RETURN_UNEXPECTED(InitLowerCaseNameToIdMap());
+ ICEBERG_RETURN_UNEXPECTED(LazyInitWithCallOnce(
+ lowercase_name_to_id_flag_, [this]() { return
InitLowerCaseNameToIdMap(); }));
auto it = lowercase_name_to_id_.find(StringUtils::ToLower(name));
if (it == lowercase_name_to_id_.end()) return std::nullopt;
return FindFieldById(it->second);
@@ -133,7 +135,8 @@ Status Schema::InitLowerCaseNameToIdMap() const {
Result<std::optional<std::reference_wrapper<const SchemaField>>>
Schema::FindFieldById(
int32_t field_id) const {
- ICEBERG_RETURN_UNEXPECTED(InitIdToFieldMap());
+ ICEBERG_RETURN_UNEXPECTED(
+ LazyInitWithCallOnce(id_to_field_flag_, [this]() { return
InitIdToFieldMap(); }));
auto it = id_to_field_.find(field_id);
if (it == id_to_field_.end()) {
return std::nullopt;
diff --git a/src/iceberg/schema.h b/src/iceberg/schema.h
index 1de829c..260d9d3 100644
--- a/src/iceberg/schema.h
+++ b/src/iceberg/schema.h
@@ -24,6 +24,7 @@
/// and any utility functions. See iceberg/type.h and iceberg/field.h as well.
#include <cstdint>
+#include <mutex>
#include <optional>
#include <string>
#include <vector>
@@ -78,8 +79,6 @@ class ICEBERG_EXPORT Schema : public StructType {
/// \brief Compare two schemas for equality.
[[nodiscard]] bool Equals(const Schema& other) const;
- // TODO(nullccxsy): Address potential concurrency issues in lazy
initialization (e.g.,
- // use std::call_once)
Status InitIdToFieldMap() const;
Status InitNameToIdMap() const;
Status InitLowerCaseNameToIdMap() const;
@@ -94,6 +93,10 @@ class ICEBERG_EXPORT Schema : public StructType {
/// Mapping from lowercased field name to field id
mutable std::unordered_map<std::string, int32_t, StringHash, std::equal_to<>>
lowercase_name_to_id_;
+
+ mutable std::once_flag id_to_field_flag_;
+ mutable std::once_flag name_to_id_flag_;
+ mutable std::once_flag lowercase_name_to_id_flag_;
};
} // namespace iceberg
diff --git a/src/iceberg/type.cc b/src/iceberg/type.cc
index b435bb3..8d230d7 100644
--- a/src/iceberg/type.cc
+++ b/src/iceberg/type.cc
@@ -50,7 +50,8 @@ std::string StructType::ToString() const {
std::span<const SchemaField> StructType::fields() const { return fields_; }
Result<std::optional<NestedType::SchemaFieldConstRef>>
StructType::GetFieldById(
int32_t field_id) const {
- ICEBERG_RETURN_UNEXPECTED(InitFieldById());
+ ICEBERG_RETURN_UNEXPECTED(
+ LazyInitWithCallOnce(field_by_id_flag_, [this]() { return
InitFieldById(); }));
auto it = field_by_id_.find(field_id);
if (it == field_by_id_.end()) return std::nullopt;
return it->second;
@@ -65,14 +66,16 @@ Result<std::optional<NestedType::SchemaFieldConstRef>>
StructType::GetFieldByInd
Result<std::optional<NestedType::SchemaFieldConstRef>>
StructType::GetFieldByName(
std::string_view name, bool case_sensitive) const {
if (case_sensitive) {
- ICEBERG_RETURN_UNEXPECTED(InitFieldByName());
+ ICEBERG_RETURN_UNEXPECTED(LazyInitWithCallOnce(
+ field_by_name_flag_, [this]() { return InitFieldByName(); }));
auto it = field_by_name_.find(name);
if (it != field_by_name_.end()) {
return it->second;
}
return std::nullopt;
}
- ICEBERG_RETURN_UNEXPECTED(InitFieldByLowerCaseName());
+ ICEBERG_RETURN_UNEXPECTED(LazyInitWithCallOnce(
+ field_by_lowercase_name_flag_, [this]() { return
InitFieldByLowerCaseName(); }));
auto it = field_by_lowercase_name_.find(StringUtils::ToLower(name));
if (it != field_by_lowercase_name_.end()) {
return it->second;
diff --git a/src/iceberg/type.h b/src/iceberg/type.h
index d1d1934..01c911d 100644
--- a/src/iceberg/type.h
+++ b/src/iceberg/type.h
@@ -26,6 +26,7 @@
#include <array>
#include <cstdint>
#include <memory>
+#include <mutex>
#include <optional>
#include <span>
#include <string>
@@ -39,6 +40,13 @@
namespace iceberg {
+template <typename Func>
+Status LazyInitWithCallOnce(std::once_flag& flag, Func&& func) {
+ Status status;
+ std::call_once(flag, [&status, &func]() { status = func(); });
+ return status;
+}
+
/// \brief Interface for a data type for a field.
class ICEBERG_EXPORT Type : public iceberg::util::Formattable {
public:
@@ -124,8 +132,7 @@ class ICEBERG_EXPORT StructType : public NestedType {
protected:
bool Equals(const Type& other) const override;
- // TODO(nullccxsy): Lazy initialization has concurrency issues, need to add
proper
- // synchronization mechanism
+
Status InitFieldById() const;
Status InitFieldByName() const;
Status InitFieldByLowerCaseName() const;
@@ -134,6 +141,10 @@ class ICEBERG_EXPORT StructType : public NestedType {
mutable std::unordered_map<int32_t, SchemaFieldConstRef> field_by_id_;
mutable std::unordered_map<std::string_view, SchemaFieldConstRef>
field_by_name_;
mutable std::unordered_map<std::string, SchemaFieldConstRef>
field_by_lowercase_name_;
+
+ mutable std::once_flag field_by_id_flag_;
+ mutable std::once_flag field_by_name_flag_;
+ mutable std::once_flag field_by_lowercase_name_flag_;
};
/// \brief A data type representing a list of values.
diff --git a/test/avro_data_test.cc b/test/avro_data_test.cc
index b5cc1c5..2797f9b 100644
--- a/test/avro_data_test.cc
+++ b/test/avro_data_test.cc
@@ -1195,16 +1195,16 @@ TEST(ExtractDatumFromArrayTest, NullHandling) {
struct RoundTripParam {
std::string name;
- Schema iceberg_schema;
+ std::shared_ptr<Schema> iceberg_schema;
std::string arrow_json;
};
void VerifyRoundTripConversion(const RoundTripParam& test_case) {
::avro::NodePtr avro_node;
- ASSERT_THAT(ToAvroNodeVisitor{}.Visit(test_case.iceberg_schema, &avro_node),
IsOk());
+ ASSERT_THAT(ToAvroNodeVisitor{}.Visit(*test_case.iceberg_schema,
&avro_node), IsOk());
ArrowSchema arrow_c_schema;
- ASSERT_THAT(ToArrowSchema(test_case.iceberg_schema, &arrow_c_schema),
IsOk());
+ ASSERT_THAT(ToArrowSchema(*test_case.iceberg_schema, &arrow_c_schema),
IsOk());
auto arrow_schema = ::arrow::ImportSchema(&arrow_c_schema).ValueOrDie();
auto arrow_struct_type =
std::make_shared<::arrow::StructType>(arrow_schema->fields());
@@ -1221,14 +1221,14 @@ void VerifyRoundTripConversion(const RoundTripParam&
test_case) {
}
auto projection_result =
- Project(test_case.iceberg_schema, avro_node, /*prune_source=*/false);
+ Project(*test_case.iceberg_schema, avro_node, /*prune_source=*/false);
ASSERT_THAT(projection_result, IsOk());
auto projection = std::move(projection_result.value());
auto builder = ::arrow::MakeBuilder(arrow_struct_type).ValueOrDie();
for (const auto& datum : extracted_data) {
ASSERT_THAT(AppendDatumToBuilder(avro_node, datum, projection,
- test_case.iceberg_schema, builder.get()),
+ *test_case.iceberg_schema, builder.get()),
IsOk());
}
@@ -1249,7 +1249,7 @@ TEST_P(AvroRoundTripConversionTest, ConvertTypes) {
const std::vector<RoundTripParam> kRoundTripTestCases = {
{
.name = "SimpleStruct",
- .iceberg_schema = Schema({
+ .iceberg_schema = std::make_shared<Schema>(std::vector<SchemaField>{
SchemaField::MakeRequired(1, "id", int32()),
SchemaField::MakeRequired(2, "name", string()),
SchemaField::MakeOptional(3, "age", int32()),
@@ -1262,7 +1262,7 @@ const std::vector<RoundTripParam> kRoundTripTestCases = {
},
{
.name = "PrimitiveTypes",
- .iceberg_schema = Schema({
+ .iceberg_schema = std::make_shared<Schema>(std::vector<SchemaField>{
SchemaField::MakeRequired(1, "bool_field", boolean()),
SchemaField::MakeRequired(2, "int_field", int32()),
SchemaField::MakeRequired(3, "long_field", int64()),
@@ -1277,7 +1277,7 @@ const std::vector<RoundTripParam> kRoundTripTestCases = {
},
{
.name = "NestedStruct",
- .iceberg_schema = Schema({
+ .iceberg_schema = std::make_shared<Schema>(std::vector<SchemaField>{
SchemaField::MakeRequired(1, "id", int32()),
SchemaField::MakeRequired(
2, "person",
@@ -1293,7 +1293,7 @@ const std::vector<RoundTripParam> kRoundTripTestCases = {
},
{
.name = "ListOfIntegers",
- .iceberg_schema = Schema({
+ .iceberg_schema = std::make_shared<Schema>(std::vector<SchemaField>{
SchemaField::MakeRequired(
1, "numbers",
std::make_shared<ListType>(
@@ -1307,7 +1307,7 @@ const std::vector<RoundTripParam> kRoundTripTestCases = {
},
{
.name = "MapStringToInt",
- .iceberg_schema = Schema({
+ .iceberg_schema = std::make_shared<Schema>(std::vector<SchemaField>{
SchemaField::MakeRequired(
1, "scores",
std::make_shared<MapType>(
@@ -1322,7 +1322,7 @@ const std::vector<RoundTripParam> kRoundTripTestCases = {
},
{
.name = "ComplexNested",
- .iceberg_schema = Schema({
+ .iceberg_schema = std::make_shared<Schema>(std::vector<SchemaField>{
SchemaField::MakeRequired(
1, "data",
std::make_shared<StructType>(std::vector<SchemaField>{
@@ -1345,7 +1345,7 @@ const std::vector<RoundTripParam> kRoundTripTestCases = {
},
{
.name = "NullablePrimitives",
- .iceberg_schema = Schema({
+ .iceberg_schema = std::make_shared<Schema>(std::vector<SchemaField>{
SchemaField::MakeOptional(1, "optional_bool", boolean()),
SchemaField::MakeOptional(2, "optional_int", int32()),
SchemaField::MakeOptional(3, "optional_long", int64()),
@@ -1361,7 +1361,7 @@ const std::vector<RoundTripParam> kRoundTripTestCases = {
},
{
.name = "NullableNestedStruct",
- .iceberg_schema = Schema({
+ .iceberg_schema = std::make_shared<Schema>(std::vector<SchemaField>{
SchemaField::MakeRequired(1, "id", int32()),
SchemaField::MakeOptional(
2, "person",
@@ -1381,7 +1381,7 @@ const std::vector<RoundTripParam> kRoundTripTestCases = {
},
{
.name = "NullableListElements",
- .iceberg_schema = Schema({
+ .iceberg_schema = std::make_shared<Schema>(std::vector<SchemaField>{
SchemaField::MakeRequired(1, "id", int32()),
SchemaField::MakeOptional(
2, "numbers",
@@ -1401,7 +1401,7 @@ const std::vector<RoundTripParam> kRoundTripTestCases = {
},
{
.name = "NullableMapValues",
- .iceberg_schema = Schema({
+ .iceberg_schema = std::make_shared<Schema>(std::vector<SchemaField>{
SchemaField::MakeRequired(1, "id", int32()),
SchemaField::MakeOptional(
2, "scores",
@@ -1423,7 +1423,7 @@ const std::vector<RoundTripParam> kRoundTripTestCases = {
},
{
.name = "DeeplyNestedWithNulls",
- .iceberg_schema = Schema({
+ .iceberg_schema = std::make_shared<Schema>(std::vector<SchemaField>{
SchemaField::MakeRequired(
1, "root",
std::make_shared<StructType>(std::vector<SchemaField>{
@@ -1452,7 +1452,7 @@ const std::vector<RoundTripParam> kRoundTripTestCases = {
},
{
.name = "AllNullsVariations",
- .iceberg_schema = Schema({
+ .iceberg_schema = std::make_shared<Schema>(std::vector<SchemaField>{
SchemaField::MakeOptional(1, "always_null", string()),
SchemaField::MakeOptional(2, "sometimes_null", int32()),
SchemaField::MakeOptional(
diff --git a/test/schema_test.cc b/test/schema_test.cc
index 272c6e7..b01ffe9 100644
--- a/test/schema_test.cc
+++ b/test/schema_test.cc
@@ -21,6 +21,7 @@
#include <format>
#include <memory>
+#include <thread>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
@@ -490,3 +491,71 @@ TEST(SchemaTest, NestedDuplicateFieldIdError) {
EXPECT_THAT(result.error().message,
::testing::HasSubstr("Duplicate field id found: 1"));
}
+
+// Thread safety tests for Lazy Init
+class SchemaThreadSafetyTest : public ::testing::Test {
+ protected:
+ void SetUp() override {
+ field1_ = std::make_unique<iceberg::SchemaField>(1, "id",
iceberg::int32(), true);
+ field2_ = std::make_unique<iceberg::SchemaField>(2, "name",
iceberg::string(), true);
+ field3_ = std::make_unique<iceberg::SchemaField>(3, "age",
iceberg::int32(), true);
+ schema_ = std::make_unique<iceberg::Schema>(
+ std::vector<iceberg::SchemaField>{*field1_, *field2_, *field3_}, 100);
+ }
+
+ std::unique_ptr<iceberg::Schema> schema_;
+ std::unique_ptr<iceberg::SchemaField> field1_;
+ std::unique_ptr<iceberg::SchemaField> field2_;
+ std::unique_ptr<iceberg::SchemaField> field3_;
+};
+
+TEST_F(SchemaThreadSafetyTest, ConcurrentFindFieldById) {
+ const int num_threads = 10;
+ const int iterations_per_thread = 100;
+ std::vector<std::thread> threads;
+
+ for (int i = 0; i < num_threads; ++i) {
+ threads.emplace_back([this, iterations_per_thread]() {
+ for (int j = 0; j < iterations_per_thread; ++j) {
+ ASSERT_THAT(schema_->FindFieldById(1), ::testing::Optional(*field1_));
+ ASSERT_THAT(schema_->FindFieldById(999),
::testing::Optional(std::nullopt));
+ }
+ });
+ }
+
+ for (auto& thread : threads) {
+ thread.join();
+ }
+}
+
+TEST_F(SchemaThreadSafetyTest, MixedConcurrentOperations) {
+ const int num_threads = 8;
+ const int iterations_per_thread = 50;
+ std::vector<std::thread> threads;
+
+ for (int i = 0; i < num_threads; ++i) {
+ threads.emplace_back([this, iterations_per_thread, i]() {
+ for (int j = 0; j < iterations_per_thread; ++j) {
+ if (i % 4 == 0) {
+ ASSERT_THAT(schema_->FindFieldById(1),
::testing::Optional(*field1_));
+ } else if (i % 4 == 1) {
+ ASSERT_THAT(schema_->FindFieldByName("name", true),
+ ::testing::Optional(*field2_));
+ } else if (i % 4 == 2) {
+ ASSERT_THAT(schema_->FindFieldByName("AGE", false),
+ ::testing::Optional(*field3_));
+ } else {
+ ASSERT_THAT(schema_->FindFieldById(2),
::testing::Optional(*field2_));
+ ASSERT_THAT(schema_->FindFieldByName("id", true),
+ ::testing::Optional(*field1_));
+ ASSERT_THAT(schema_->FindFieldByName("age", false),
+ ::testing::Optional(*field3_));
+ }
+ }
+ });
+ }
+
+ for (auto& thread : threads) {
+ thread.join();
+ }
+}
diff --git a/test/type_test.cc b/test/type_test.cc
index 9963ab3..4fd8c46 100644
--- a/test/type_test.cc
+++ b/test/type_test.cc
@@ -22,6 +22,7 @@
#include <format>
#include <memory>
#include <string>
+#include <thread>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
@@ -511,3 +512,95 @@ TEST(TypeTest, StructDuplicateLowerCaseName) {
iceberg::HasErrorMessage(
"Duplicate lowercase field name found: foo (prev id: 1, curr
id: 2)"));
}
+
+// Thread safety tests for StructType Lazy Init
+class StructTypeThreadSafetyTest : public ::testing::Test {
+ protected:
+ void SetUp() override {
+ field1_ = std::make_unique<iceberg::SchemaField>(1, "id",
iceberg::int32(), true);
+ field2_ = std::make_unique<iceberg::SchemaField>(2, "name",
iceberg::string(), true);
+ field3_ = std::make_unique<iceberg::SchemaField>(3, "age",
iceberg::int32(), true);
+
+ struct_type_ = std::make_unique<iceberg::StructType>(
+ std::vector<iceberg::SchemaField>{*field1_, *field2_, *field3_});
+ }
+
+ std::unique_ptr<iceberg::StructType> struct_type_;
+ std::unique_ptr<iceberg::SchemaField> field1_;
+ std::unique_ptr<iceberg::SchemaField> field2_;
+ std::unique_ptr<iceberg::SchemaField> field3_;
+};
+
+TEST_F(StructTypeThreadSafetyTest, ConcurrentGetFieldById) {
+ const int num_threads = 10;
+ const int iterations_per_thread = 100;
+ std::vector<std::thread> threads;
+
+ for (int i = 0; i < num_threads; ++i) {
+ threads.emplace_back([this, iterations_per_thread]() {
+ for (int j = 0; j < iterations_per_thread; ++j) {
+ ASSERT_THAT(struct_type_->GetFieldById(1),
::testing::Optional(*field1_));
+ ASSERT_THAT(struct_type_->GetFieldById(999),
::testing::Optional(std::nullopt));
+ }
+ });
+ }
+
+ for (auto& thread : threads) {
+ thread.join();
+ }
+}
+
+TEST_F(StructTypeThreadSafetyTest, ConcurrentGetFieldByName) {
+ const int num_threads = 10;
+ const int iterations_per_thread = 100;
+ std::vector<std::thread> threads;
+
+ for (int i = 0; i < num_threads; ++i) {
+ threads.emplace_back([this, iterations_per_thread]() {
+ for (int j = 0; j < iterations_per_thread; ++j) {
+ ASSERT_THAT(struct_type_->GetFieldByName("id", true),
+ ::testing::Optional(*field1_));
+ ASSERT_THAT(struct_type_->GetFieldByName("NAME", false),
+ ::testing::Optional(*field2_));
+ ASSERT_THAT(struct_type_->GetFieldByName("noexist", false),
+ ::testing::Optional(std::nullopt));
+ }
+ });
+ }
+
+ for (auto& thread : threads) {
+ thread.join();
+ }
+}
+
+TEST_F(StructTypeThreadSafetyTest, MixedConcurrentOperations) {
+ const int num_threads = 8;
+ const int iterations_per_thread = 50;
+ std::vector<std::thread> threads;
+
+ for (int i = 0; i < num_threads; ++i) {
+ threads.emplace_back([this, iterations_per_thread, i]() {
+ for (int j = 0; j < iterations_per_thread; ++j) {
+ if (i % 4 == 0) {
+ ASSERT_THAT(struct_type_->GetFieldById(1),
::testing::Optional(*field1_));
+ } else if (i % 4 == 1) {
+ ASSERT_THAT(struct_type_->GetFieldByName("name", true),
+ ::testing::Optional(*field2_));
+ } else if (i % 4 == 2) {
+ ASSERT_THAT(struct_type_->GetFieldByName("AGE", false),
+ ::testing::Optional(*field3_));
+ } else {
+ ASSERT_THAT(struct_type_->GetFieldById(2),
::testing::Optional(*field2_));
+ ASSERT_THAT(struct_type_->GetFieldByName("id", true),
+ ::testing::Optional(*field1_));
+ ASSERT_THAT(struct_type_->GetFieldByName("age", false),
+ ::testing::Optional(*field3_));
+ }
+ }
+ });
+ }
+
+ for (auto& thread : threads) {
+ thread.join();
+ }
+}