This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 6f1cdfd8 feat: add update partition stats (#538)
6f1cdfd8 is described below
commit 6f1cdfd800c540cb7d4892a8644e2cfde2729301
Author: Feiyang Li <[email protected]>
AuthorDate: Tue Jan 27 13:38:52 2026 +0800
feat: add update partition stats (#538)
---
src/iceberg/CMakeLists.txt | 1 +
src/iceberg/json_internal.cc | 33 ++++
src/iceberg/meson.build | 1 +
src/iceberg/table.cc | 8 +
src/iceberg/table.h | 5 +
src/iceberg/table_metadata.cc | 45 ++++-
src/iceberg/table_update.cc | 56 +++++++
src/iceberg/table_update.h | 50 ++++++
src/iceberg/test/CMakeLists.txt | 1 +
src/iceberg/test/json_internal_test.cc | 37 +++++
.../test/update_partition_statistics_test.cc | 181 +++++++++++++++++++++
src/iceberg/transaction.cc | 25 +++
src/iceberg/transaction.h | 5 +
src/iceberg/type_fwd.h | 1 +
src/iceberg/update/meson.build | 1 +
src/iceberg/update/pending_update.h | 1 +
src/iceberg/update/update_partition_statistics.cc | 78 +++++++++
src/iceberg/update/update_partition_statistics.h | 80 +++++++++
18 files changed, 607 insertions(+), 2 deletions(-)
diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index 359b79e6..d82229ef 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -92,6 +92,7 @@ set(ICEBERG_SOURCES
update/snapshot_update.cc
update/update_location.cc
update/update_partition_spec.cc
+ update/update_partition_statistics.cc
update/update_properties.cc
update/update_schema.cc
update/update_snapshot_reference.cc
diff --git a/src/iceberg/json_internal.cc b/src/iceberg/json_internal.cc
index 056af671..95883a93 100644
--- a/src/iceberg/json_internal.cc
+++ b/src/iceberg/json_internal.cc
@@ -192,6 +192,9 @@ constexpr std::string_view kActionRemoveProperties =
"remove-properties";
constexpr std::string_view kActionSetLocation = "set-location";
constexpr std::string_view kActionSetStatistics = "set-statistics";
constexpr std::string_view kActionRemoveStatistics = "remove-statistics";
+constexpr std::string_view kActionSetPartitionStatistics =
"set-partition-statistics";
+constexpr std::string_view kActionRemovePartitionStatistics =
+ "remove-partition-statistics";
// TableUpdate field constants
constexpr std::string_view kUUID = "uuid";
@@ -1439,6 +1442,24 @@ nlohmann::json ToJson(const TableUpdate& update) {
json[kSnapshotId] = u.snapshot_id();
break;
}
+ case TableUpdate::Kind::kSetPartitionStatistics: {
+ const auto& u =
+ internal::checked_cast<const table::SetPartitionStatistics&>(update);
+ json[kAction] = kActionSetPartitionStatistics;
+ if (u.partition_statistics_file()) {
+ json[kPartitionStatistics] = ToJson(*u.partition_statistics_file());
+ } else {
+ json[kPartitionStatistics] = nlohmann::json::value_t::null;
+ }
+ break;
+ }
+ case TableUpdate::Kind::kRemovePartitionStatistics: {
+ const auto& u =
+ internal::checked_cast<const
table::RemovePartitionStatistics&>(update);
+ json[kAction] = kActionRemovePartitionStatistics;
+ json[kSnapshotId] = u.snapshot_id();
+ break;
+ }
}
return json;
}
@@ -1628,6 +1649,18 @@ Result<std::unique_ptr<TableUpdate>>
TableUpdateFromJson(const nlohmann::json& j
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue<int64_t>(json,
kSnapshotId));
return std::make_unique<table::RemoveStatistics>(snapshot_id);
}
+ if (action == kActionSetPartitionStatistics) {
+ ICEBERG_ASSIGN_OR_RAISE(auto partition_statistics_json,
+ GetJsonValue<nlohmann::json>(json,
kPartitionStatistics));
+ ICEBERG_ASSIGN_OR_RAISE(auto partition_statistics_file,
+
PartitionStatisticsFileFromJson(partition_statistics_json));
+ return std::make_unique<table::SetPartitionStatistics>(
+ std::move(partition_statistics_file));
+ }
+ if (action == kActionRemovePartitionStatistics) {
+ ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue<int64_t>(json,
kSnapshotId));
+ return std::make_unique<table::RemovePartitionStatistics>(snapshot_id);
+ }
return JsonParseError("Unknown table update action: {}", action);
}
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index 05cb6f8d..651de782 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -110,6 +110,7 @@ iceberg_sources = files(
'update/snapshot_update.cc',
'update/update_location.cc',
'update/update_partition_spec.cc',
+ 'update/update_partition_statistics.cc',
'update/update_properties.cc',
'update/update_schema.cc',
'update/update_snapshot_reference.cc',
diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc
index 73acafd7..b6c26ea0 100644
--- a/src/iceberg/table.cc
+++ b/src/iceberg/table.cc
@@ -33,6 +33,7 @@
#include "iceberg/transaction.h"
#include "iceberg/update/expire_snapshots.h"
#include "iceberg/update/update_partition_spec.h"
+#include "iceberg/update/update_partition_statistics.h"
#include "iceberg/update/update_properties.h"
#include "iceberg/update/update_schema.h"
#include "iceberg/update/update_statistics.h"
@@ -214,6 +215,13 @@ Result<std::shared_ptr<UpdateStatistics>>
Table::NewUpdateStatistics() {
return transaction->NewUpdateStatistics();
}
+Result<std::shared_ptr<UpdatePartitionStatistics>>
Table::NewUpdatePartitionStatistics() {
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto transaction, Transaction::Make(shared_from_this(),
Transaction::Kind::kUpdate,
+ /*auto_commit=*/true));
+ return transaction->NewUpdatePartitionStatistics();
+}
+
Result<std::shared_ptr<StagedTable>> StagedTable::Make(
TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
std::string metadata_location, std::shared_ptr<FileIO> io,
diff --git a/src/iceberg/table.h b/src/iceberg/table.h
index 77e9016f..1f3135dd 100644
--- a/src/iceberg/table.h
+++ b/src/iceberg/table.h
@@ -156,6 +156,11 @@ class ICEBERG_EXPORT Table : public
std::enable_shared_from_this<Table> {
/// changes.
virtual Result<std::shared_ptr<UpdateStatistics>> NewUpdateStatistics();
+ /// \brief Create a new UpdatePartitionStatistics to update partition
statistics and
+ /// commit the changes.
+ virtual Result<std::shared_ptr<UpdatePartitionStatistics>>
+ NewUpdatePartitionStatistics();
+
/// \brief Create a new UpdateLocation to update the table location and
commit the
/// changes.
virtual Result<std::shared_ptr<UpdateLocation>> NewUpdateLocation();
diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc
index 393b438a..7dab6704 100644
--- a/src/iceberg/table_metadata.cc
+++ b/src/iceberg/table_metadata.cc
@@ -623,6 +623,9 @@ class TableMetadataBuilder::Impl {
Status RemovePartitionSpecs(const std::vector<int32_t>& spec_ids);
Status SetStatistics(std::shared_ptr<StatisticsFile> statistics_file);
Status RemoveStatistics(int64_t snapshot_id);
+ Status SetPartitionStatistics(
+ std::shared_ptr<PartitionStatisticsFile> partition_statistics_file);
+ Status RemovePartitionStatistics(int64_t snapshot_id);
Result<std::unique_ptr<TableMetadata>> Build();
@@ -1208,6 +1211,41 @@ Status
TableMetadataBuilder::Impl::RemoveStatistics(int64_t snapshot_id) {
return {};
}
+Status TableMetadataBuilder::Impl::SetPartitionStatistics(
+ std::shared_ptr<PartitionStatisticsFile> partition_statistics_file) {
+ ICEBERG_PRECHECK(partition_statistics_file != nullptr,
+ "Cannot set null partition statistics file");
+
+ // Find and replace existing partition statistics for the same snapshot_id,
or add new
+ // one
+ auto it = std::ranges::find_if(
+ metadata_.partition_statistics,
+ [snapshot_id = partition_statistics_file->snapshot_id](const auto& stat)
{
+ return stat && stat->snapshot_id == snapshot_id;
+ });
+
+ if (it != metadata_.partition_statistics.end()) {
+ *it = partition_statistics_file;
+ } else {
+ metadata_.partition_statistics.push_back(partition_statistics_file);
+ }
+
+ changes_.push_back(std::make_unique<table::SetPartitionStatistics>(
+ std::move(partition_statistics_file)));
+ return {};
+}
+
+Status TableMetadataBuilder::Impl::RemovePartitionStatistics(int64_t
snapshot_id) {
+ auto removed_count =
+ std::erase_if(metadata_.partition_statistics, [snapshot_id](const auto&
stat) {
+ return stat && stat->snapshot_id == snapshot_id;
+ });
+ if (removed_count != 0) {
+
changes_.push_back(std::make_unique<table::RemovePartitionStatistics>(snapshot_id));
+ }
+ return {};
+}
+
std::unordered_set<int64_t>
TableMetadataBuilder::Impl::IntermediateSnapshotIdSet(
int64_t current_snapshot_id) const {
std::unordered_set<int64_t> added_snapshot_ids;
@@ -1636,12 +1674,15 @@ TableMetadataBuilder&
TableMetadataBuilder::RemoveStatistics(int64_t snapshot_id
TableMetadataBuilder& TableMetadataBuilder::SetPartitionStatistics(
const std::shared_ptr<PartitionStatisticsFile>& partition_statistics_file)
{
- throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+ ICEBERG_BUILDER_RETURN_IF_ERROR(
+ impl_->SetPartitionStatistics(partition_statistics_file));
+ return *this;
}
TableMetadataBuilder& TableMetadataBuilder::RemovePartitionStatistics(
int64_t snapshot_id) {
- throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+
ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->RemovePartitionStatistics(snapshot_id));
+ return *this;
}
TableMetadataBuilder& TableMetadataBuilder::SetProperties(
diff --git a/src/iceberg/table_update.cc b/src/iceberg/table_update.cc
index 5866b61d..946b2a6a 100644
--- a/src/iceberg/table_update.cc
+++ b/src/iceberg/table_update.cc
@@ -500,4 +500,60 @@ std::unique_ptr<TableUpdate> RemoveStatistics::Clone()
const {
return std::make_unique<RemoveStatistics>(snapshot_id_);
}
+// SetPartitionStatistics
+
+int64_t SetPartitionStatistics::snapshot_id() const {
+ return partition_statistics_file_->snapshot_id;
+}
+
+void SetPartitionStatistics::ApplyTo(TableMetadataBuilder& builder) const {
+ builder.SetPartitionStatistics(partition_statistics_file_);
+}
+
+void SetPartitionStatistics::GenerateRequirements(TableUpdateContext& context)
const {
+ // SetPartitionStatistics doesn't generate any requirements
+}
+
+bool SetPartitionStatistics::Equals(const TableUpdate& other) const {
+ if (other.kind() != Kind::kSetPartitionStatistics) {
+ return false;
+ }
+ const auto& other_set = internal::checked_cast<const
SetPartitionStatistics&>(other);
+ if (!partition_statistics_file_ != !other_set.partition_statistics_file_) {
+ return false;
+ }
+ if (partition_statistics_file_ &&
+ !(*partition_statistics_file_ == *other_set.partition_statistics_file_))
{
+ return false;
+ }
+ return true;
+}
+
+std::unique_ptr<TableUpdate> SetPartitionStatistics::Clone() const {
+ return std::make_unique<SetPartitionStatistics>(partition_statistics_file_);
+}
+
+// RemovePartitionStatistics
+
+void RemovePartitionStatistics::ApplyTo(TableMetadataBuilder& builder) const {
+ builder.RemovePartitionStatistics(snapshot_id_);
+}
+
+void RemovePartitionStatistics::GenerateRequirements(TableUpdateContext&
context) const {
+ // RemovePartitionStatistics doesn't generate any requirements
+}
+
+bool RemovePartitionStatistics::Equals(const TableUpdate& other) const {
+ if (other.kind() != Kind::kRemovePartitionStatistics) {
+ return false;
+ }
+ const auto& other_remove =
+ internal::checked_cast<const RemovePartitionStatistics&>(other);
+ return snapshot_id_ == other_remove.snapshot_id_;
+}
+
+std::unique_ptr<TableUpdate> RemovePartitionStatistics::Clone() const {
+ return std::make_unique<RemovePartitionStatistics>(snapshot_id_);
+}
+
} // namespace iceberg::table
diff --git a/src/iceberg/table_update.h b/src/iceberg/table_update.h
index 5bbc243e..c75c3fa6 100644
--- a/src/iceberg/table_update.h
+++ b/src/iceberg/table_update.h
@@ -61,6 +61,8 @@ class ICEBERG_EXPORT TableUpdate {
kSetLocation,
kSetStatistics,
kRemoveStatistics,
+ kSetPartitionStatistics,
+ kRemovePartitionStatistics,
};
virtual ~TableUpdate();
@@ -558,6 +560,54 @@ class ICEBERG_EXPORT RemoveStatistics : public TableUpdate
{
int64_t snapshot_id_;
};
+/// \brief Represents setting partition statistics for a snapshot
+class ICEBERG_EXPORT SetPartitionStatistics : public TableUpdate {
+ public:
+ explicit SetPartitionStatistics(
+ std::shared_ptr<PartitionStatisticsFile> partition_statistics_file)
+ : partition_statistics_file_(std::move(partition_statistics_file)) {}
+
+ int64_t snapshot_id() const;
+
+ const std::shared_ptr<PartitionStatisticsFile>& partition_statistics_file()
const {
+ return partition_statistics_file_;
+ }
+
+ void ApplyTo(TableMetadataBuilder& builder) const override;
+
+ void GenerateRequirements(TableUpdateContext& context) const override;
+
+ Kind kind() const override { return Kind::kSetPartitionStatistics; }
+
+ bool Equals(const TableUpdate& other) const override;
+
+ std::unique_ptr<TableUpdate> Clone() const override;
+
+ private:
+ std::shared_ptr<PartitionStatisticsFile> partition_statistics_file_;
+};
+
+/// \brief Represents removing partition statistics for a snapshot
+class ICEBERG_EXPORT RemovePartitionStatistics : public TableUpdate {
+ public:
+ explicit RemovePartitionStatistics(int64_t snapshot_id) :
snapshot_id_(snapshot_id) {}
+
+ int64_t snapshot_id() const { return snapshot_id_; }
+
+ void ApplyTo(TableMetadataBuilder& builder) const override;
+
+ void GenerateRequirements(TableUpdateContext& context) const override;
+
+ Kind kind() const override { return Kind::kRemovePartitionStatistics; }
+
+ bool Equals(const TableUpdate& other) const override;
+
+ std::unique_ptr<TableUpdate> Clone() const override;
+
+ private:
+ int64_t snapshot_id_;
+};
+
} // namespace table
} // namespace iceberg
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index 5243d9b7..f3c8aea9 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -182,6 +182,7 @@ if(ICEBERG_BUILD_BUNDLE)
transaction_test.cc
update_location_test.cc
update_partition_spec_test.cc
+ update_partition_statistics_test.cc
update_properties_test.cc
update_schema_test.cc
update_sort_order_test.cc
diff --git a/src/iceberg/test/json_internal_test.cc
b/src/iceberg/test/json_internal_test.cc
index bb167ad0..8fa24312 100644
--- a/src/iceberg/test/json_internal_test.cc
+++ b/src/iceberg/test/json_internal_test.cc
@@ -613,6 +613,43 @@ TEST(JsonInternalTest, TableUpdateRemoveStatistics) {
update);
}
+TEST(JsonInternalTest, TableUpdateSetPartitionStatistics) {
+ auto partition_stats_file = std::make_shared<PartitionStatisticsFile>();
+ partition_stats_file->snapshot_id = 123456789;
+ partition_stats_file->path =
+ "s3://bucket/warehouse/table/metadata/partition-stats-123456789.parquet";
+ partition_stats_file->file_size_in_bytes = 2048;
+
+ table::SetPartitionStatistics update(partition_stats_file);
+ nlohmann::json expected = R"({
+ "action": "set-partition-statistics",
+ "partition-statistics": {
+ "snapshot-id": 123456789,
+ "statistics-path":
"s3://bucket/warehouse/table/metadata/partition-stats-123456789.parquet",
+ "file-size-in-bytes": 2048
+ }
+ })"_json;
+
+ EXPECT_EQ(ToJson(update), expected);
+ auto parsed = TableUpdateFromJson(expected);
+ ASSERT_THAT(parsed, IsOk());
+
EXPECT_EQ(*internal::checked_cast<table::SetPartitionStatistics*>(parsed.value().get()),
+ update);
+}
+
+TEST(JsonInternalTest, TableUpdateRemovePartitionStatistics) {
+ table::RemovePartitionStatistics update(123456789);
+ nlohmann::json expected =
+
R"({"action":"remove-partition-statistics","snapshot-id":123456789})"_json;
+
+ EXPECT_EQ(ToJson(update), expected);
+ auto parsed = TableUpdateFromJson(expected);
+ ASSERT_THAT(parsed, IsOk());
+ EXPECT_EQ(
+
*internal::checked_cast<table::RemovePartitionStatistics*>(parsed.value().get()),
+ update);
+}
+
TEST(JsonInternalTest, TableUpdateUnknownAction) {
nlohmann::json json = R"({"action":"unknown-action"})"_json;
auto result = TableUpdateFromJson(json);
diff --git a/src/iceberg/test/update_partition_statistics_test.cc
b/src/iceberg/test/update_partition_statistics_test.cc
new file mode 100644
index 00000000..5ed84cc0
--- /dev/null
+++ b/src/iceberg/test/update_partition_statistics_test.cc
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/update_partition_statistics.h"
+
+#include <algorithm>
+#include <memory>
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/result.h"
+#include "iceberg/statistics_file.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/test/update_test_base.h"
+
+namespace iceberg {
+
+class UpdatePartitionStatisticsTest : public UpdateTestBase {
+ protected:
+ // Helper function to create a partition statistics file
+ std::shared_ptr<PartitionStatisticsFile> MakePartitionStatisticsFile(
+ int64_t snapshot_id, const std::string& path, int64_t file_size = 2048) {
+ auto stats_file = std::make_shared<PartitionStatisticsFile>();
+ stats_file->snapshot_id = snapshot_id;
+ stats_file->path = path;
+ stats_file->file_size_in_bytes = file_size;
+ return stats_file;
+ }
+
+ // Helper to find partition statistics file by snapshot_id in the result
vector
+ std::shared_ptr<PartitionStatisticsFile> FindPartitionStatistics(
+ const std::vector<std::pair<int64_t,
std::shared_ptr<PartitionStatisticsFile>>>&
+ to_set,
+ int64_t snapshot_id) {
+ auto it = std::ranges::find_if(
+ to_set, [snapshot_id](const auto& p) { return p.first == snapshot_id;
});
+ return it != to_set.end() ? it->second : nullptr;
+ }
+};
+
+TEST_F(UpdatePartitionStatisticsTest, EmptyUpdate) {
+ ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdatePartitionStatistics());
+ ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+ EXPECT_TRUE(result.to_set.empty());
+ EXPECT_TRUE(result.to_remove.empty());
+}
+
+TEST_F(UpdatePartitionStatisticsTest, SetPartitionStatistics) {
+ ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdatePartitionStatistics());
+ auto partition_stats_file = MakePartitionStatisticsFile(
+ 1, "/warehouse/test_table/metadata/partition-stats-1.parquet");
+ update->SetPartitionStatistics(partition_stats_file);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+ EXPECT_EQ(result.to_set.size(), 1);
+ EXPECT_TRUE(result.to_remove.empty());
+
+ auto found = FindPartitionStatistics(result.to_set, 1);
+ ASSERT_NE(found, nullptr);
+ EXPECT_EQ(found->snapshot_id, 1);
+ EXPECT_EQ(found->path,
"/warehouse/test_table/metadata/partition-stats-1.parquet");
+ EXPECT_EQ(found->file_size_in_bytes, 2048);
+}
+
+TEST_F(UpdatePartitionStatisticsTest, SetMultiplePartitionStatistics) {
+ ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdatePartitionStatistics());
+
+ auto partition_stats_file1 = MakePartitionStatisticsFile(
+ 1, "/warehouse/test_table/metadata/partition-stats-1.parquet");
+ auto partition_stats_file2 = MakePartitionStatisticsFile(
+ 2, "/warehouse/test_table/metadata/partition-stats-2.parquet", 4096);
+
+ update->SetPartitionStatistics(partition_stats_file1);
+ update->SetPartitionStatistics(partition_stats_file2);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+ EXPECT_EQ(result.to_set.size(), 2);
+ EXPECT_TRUE(result.to_remove.empty());
+
+ auto found1 = FindPartitionStatistics(result.to_set, 1);
+ ASSERT_NE(found1, nullptr);
+ EXPECT_EQ(found1->snapshot_id, 1);
+
+ auto found2 = FindPartitionStatistics(result.to_set, 2);
+ ASSERT_NE(found2, nullptr);
+ EXPECT_EQ(found2->snapshot_id, 2);
+ EXPECT_EQ(found2->file_size_in_bytes, 4096);
+}
+
+TEST_F(UpdatePartitionStatisticsTest, ReplacePartitionStatistics) {
+ ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdatePartitionStatistics());
+
+ auto partition_stats_file1 = MakePartitionStatisticsFile(
+ 1, "/warehouse/test_table/metadata/partition-stats-1.parquet");
+ auto partition_stats_file2 = MakePartitionStatisticsFile(
+ 1, "/warehouse/test_table/metadata/partition-stats-1-updated.parquet",
8192);
+
+ update->SetPartitionStatistics(partition_stats_file1);
+ update->SetPartitionStatistics(partition_stats_file2);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+ EXPECT_EQ(result.to_set.size(), 1);
+ EXPECT_TRUE(result.to_remove.empty());
+
+ auto found = FindPartitionStatistics(result.to_set, 1);
+ ASSERT_NE(found, nullptr);
+ EXPECT_EQ(found->path,
+
"/warehouse/test_table/metadata/partition-stats-1-updated.parquet");
+ EXPECT_EQ(found->file_size_in_bytes, 8192);
+}
+
+TEST_F(UpdatePartitionStatisticsTest, RemovePartitionStatistics) {
+ ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdatePartitionStatistics());
+ update->RemovePartitionStatistics(1);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+ EXPECT_TRUE(result.to_set.empty());
+ EXPECT_EQ(result.to_remove.size(), 1);
+ EXPECT_EQ(result.to_remove[0], 1);
+}
+
+TEST_F(UpdatePartitionStatisticsTest, SetThenRemovePartitionStatistics) {
+ ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdatePartitionStatistics());
+
+ auto partition_stats_file = MakePartitionStatisticsFile(
+ 1, "/warehouse/test_table/metadata/partition-stats-1.parquet");
+ update->SetPartitionStatistics(partition_stats_file);
+ update->RemovePartitionStatistics(1);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+ EXPECT_TRUE(result.to_set.empty());
+ EXPECT_EQ(result.to_remove.size(), 1);
+ EXPECT_EQ(result.to_remove[0], 1);
+}
+
+TEST_F(UpdatePartitionStatisticsTest, SetNullPartitionStatistics) {
+ ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdatePartitionStatistics());
+
+ update->SetPartitionStatistics(nullptr);
+
+ auto result = update->Apply();
+ EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+ EXPECT_THAT(result, HasErrorMessage("Statistics file cannot be null"));
+}
+
+TEST_F(UpdatePartitionStatisticsTest, SetAndRemoveMixed) {
+ ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdatePartitionStatistics());
+
+ auto partition_stats_file1 = MakePartitionStatisticsFile(
+ 1, "/warehouse/test_table/metadata/partition-stats-1.parquet");
+ auto partition_stats_file2 = MakePartitionStatisticsFile(
+ 2, "/warehouse/test_table/metadata/partition-stats-2.parquet");
+
+ update->SetPartitionStatistics(partition_stats_file1);
+ update->SetPartitionStatistics(partition_stats_file2);
+ update->RemovePartitionStatistics(3);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+ EXPECT_EQ(result.to_set.size(), 2);
+ EXPECT_EQ(result.to_remove.size(), 1);
+ EXPECT_EQ(result.to_remove[0], 3);
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc
index 04ccdfb9..b24aa0da 100644
--- a/src/iceberg/transaction.cc
+++ b/src/iceberg/transaction.cc
@@ -39,6 +39,7 @@
#include "iceberg/update/snapshot_update.h"
#include "iceberg/update/update_location.h"
#include "iceberg/update/update_partition_spec.h"
+#include "iceberg/update/update_partition_statistics.h"
#include "iceberg/update/update_properties.h"
#include "iceberg/update/update_schema.h"
#include "iceberg/update/update_snapshot_reference.h"
@@ -142,6 +143,10 @@ Status Transaction::Apply(PendingUpdate& update) {
ICEBERG_RETURN_UNEXPECTED(
ApplyUpdateStatistics(internal::checked_cast<UpdateStatistics&>(update)));
break;
+ case PendingUpdate::Kind::kUpdatePartitionStatistics:
+ ICEBERG_RETURN_UNEXPECTED(ApplyUpdatePartitionStatistics(
+ internal::checked_cast<UpdatePartitionStatistics&>(update)));
+ break;
default:
return NotSupported("Unsupported pending update: {}",
static_cast<int32_t>(update.kind()));
@@ -284,6 +289,17 @@ Status
Transaction::ApplyUpdateStatistics(UpdateStatistics& update) {
return {};
}
+Status Transaction::ApplyUpdatePartitionStatistics(UpdatePartitionStatistics&
update) {
+ ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
+ for (auto&& [_, partition_stat_file] : result.to_set) {
+ metadata_builder_->SetPartitionStatistics(std::move(partition_stat_file));
+ }
+ for (const auto& snapshot_id : result.to_remove) {
+ metadata_builder_->RemovePartitionStatistics(snapshot_id);
+ }
+ return {};
+}
+
Result<std::shared_ptr<Table>> Transaction::Commit() {
if (committed_) {
return Invalid("Transaction already committed");
@@ -395,6 +411,15 @@ Result<std::shared_ptr<UpdateStatistics>>
Transaction::NewUpdateStatistics() {
return update_statistics;
}
+Result<std::shared_ptr<UpdatePartitionStatistics>>
+Transaction::NewUpdatePartitionStatistics() {
+ ICEBERG_ASSIGN_OR_RAISE(
+ std::shared_ptr<UpdatePartitionStatistics> update_partition_statistics,
+ UpdatePartitionStatistics::Make(shared_from_this()));
+ ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_partition_statistics));
+ return update_partition_statistics;
+}
+
Result<std::shared_ptr<UpdateSnapshotReference>>
Transaction::NewUpdateSnapshotReference() {
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<UpdateSnapshotReference> update_ref,
diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h
index 3d5450e5..e975be7f 100644
--- a/src/iceberg/transaction.h
+++ b/src/iceberg/transaction.h
@@ -86,6 +86,10 @@ class ICEBERG_EXPORT Transaction : public
std::enable_shared_from_this<Transacti
/// changes.
Result<std::shared_ptr<UpdateStatistics>> NewUpdateStatistics();
+ /// \brief Create a new UpdatePartitionStatistics to update partition
statistics and
+ /// commit the changes.
+ Result<std::shared_ptr<UpdatePartitionStatistics>>
NewUpdatePartitionStatistics();
+
/// \brief Create a new UpdateLocation to update the table location and
commit the
/// changes.
Result<std::shared_ptr<UpdateLocation>> NewUpdateLocation();
@@ -115,6 +119,7 @@ class ICEBERG_EXPORT Transaction : public
std::enable_shared_from_this<Transacti
Status ApplySetSnapshot(SetSnapshot& update);
Status ApplyUpdateLocation(UpdateLocation& update);
Status ApplyUpdatePartitionSpec(UpdatePartitionSpec& update);
+ Status ApplyUpdatePartitionStatistics(UpdatePartitionStatistics& update);
Status ApplyUpdateProperties(UpdateProperties& update);
Status ApplyUpdateSchema(UpdateSchema& update);
Status ApplyUpdateSnapshot(SnapshotUpdate& update);
diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h
index 9e21088f..e97de0ac 100644
--- a/src/iceberg/type_fwd.h
+++ b/src/iceberg/type_fwd.h
@@ -196,6 +196,7 @@ class SetSnapshot;
class SnapshotUpdate;
class UpdateLocation;
class UpdatePartitionSpec;
+class UpdatePartitionStatistics;
class UpdateProperties;
class UpdateSchema;
class UpdateSnapshotReference;
diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build
index e00b1e6e..102471c0 100644
--- a/src/iceberg/update/meson.build
+++ b/src/iceberg/update/meson.build
@@ -24,6 +24,7 @@ install_headers(
'snapshot_update.h',
'update_location.h',
'update_partition_spec.h',
+ 'update_partition_statistics.h',
'update_schema.h',
'update_snapshot_reference.h',
'update_sort_order.h',
diff --git a/src/iceberg/update/pending_update.h
b/src/iceberg/update/pending_update.h
index e5d583d1..f44812a8 100644
--- a/src/iceberg/update/pending_update.h
+++ b/src/iceberg/update/pending_update.h
@@ -46,6 +46,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
kSetSnapshot,
kUpdateLocation,
kUpdatePartitionSpec,
+ kUpdatePartitionStatistics,
kUpdateProperties,
kUpdateSchema,
kUpdateSnapshot,
diff --git a/src/iceberg/update/update_partition_statistics.cc
b/src/iceberg/update/update_partition_statistics.cc
new file mode 100644
index 00000000..2c06c0ce
--- /dev/null
+++ b/src/iceberg/update/update_partition_statistics.cc
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/update_partition_statistics.h"
+
+#include <cstdint>
+#include <memory>
+#include <unordered_map>
+#include <vector>
+
+#include "iceberg/result.h"
+#include "iceberg/statistics_file.h"
+#include "iceberg/transaction.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+Result<std::shared_ptr<UpdatePartitionStatistics>>
UpdatePartitionStatistics::Make(
+ std::shared_ptr<Transaction> transaction) {
+ ICEBERG_PRECHECK(transaction != nullptr,
+ "Cannot create UpdatePartitionStatistics without a
transaction");
+ return std::shared_ptr<UpdatePartitionStatistics>(
+ new UpdatePartitionStatistics(std::move(transaction)));
+}
+
+UpdatePartitionStatistics::UpdatePartitionStatistics(
+ std::shared_ptr<Transaction> transaction)
+ : PendingUpdate(std::move(transaction)) {}
+
+UpdatePartitionStatistics::~UpdatePartitionStatistics() = default;
+
+UpdatePartitionStatistics& UpdatePartitionStatistics::SetPartitionStatistics(
+ std::shared_ptr<PartitionStatisticsFile> partition_statistics_file) {
+ ICEBERG_BUILDER_CHECK(partition_statistics_file != nullptr,
+ "Statistics file cannot be null");
+
+ partition_statistics_to_set_[partition_statistics_file->snapshot_id] =
+ std::move(partition_statistics_file);
+ return *this;
+}
+
+UpdatePartitionStatistics&
UpdatePartitionStatistics::RemovePartitionStatistics(
+ int64_t snapshot_id) {
+ partition_statistics_to_set_[snapshot_id] = nullptr;
+ return *this;
+}
+
+Result<UpdatePartitionStatistics::ApplyResult>
UpdatePartitionStatistics::Apply() {
+ ICEBERG_RETURN_UNEXPECTED(CheckErrors());
+
+ ApplyResult result;
+ for (const auto& [snapshot_id, partition_stats] :
partition_statistics_to_set_) {
+ if (partition_stats) {
+ result.to_set.emplace_back(snapshot_id, partition_stats);
+ } else {
+ result.to_remove.push_back(snapshot_id);
+ }
+ }
+ return result;
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/update/update_partition_statistics.h
b/src/iceberg/update/update_partition_statistics.h
new file mode 100644
index 00000000..740fe214
--- /dev/null
+++ b/src/iceberg/update/update_partition_statistics.h
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "iceberg/iceberg_export.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+#include "iceberg/update/pending_update.h"
+
+/// \file iceberg/update/update_partition_statistics.h
+/// \brief Updates table partition statistics.
+
+namespace iceberg {
+
+/// \brief Updates table partition statistics.
+class ICEBERG_EXPORT UpdatePartitionStatistics : public PendingUpdate {
+ public:
+ static Result<std::shared_ptr<UpdatePartitionStatistics>> Make(
+ std::shared_ptr<Transaction> transaction);
+
+ ~UpdatePartitionStatistics() override;
+
+ /// \brief Set partition statistics file for a snapshot.
+ ///
+ /// Associates a partition statistics file with a snapshot ID. If partition
statistics
+ /// already exist for this snapshot, they will be replaced.
+ ///
+ /// \param partition_statistics_file The partition statistics file to set
+ /// \return Reference to this UpdatePartitionStatistics for chaining
+ UpdatePartitionStatistics& SetPartitionStatistics(
+ std::shared_ptr<PartitionStatisticsFile> partition_statistics_file);
+
+ /// \brief Remove partition statistics for a snapshot.
+ ///
+ /// Marks the partition statistics for the given snapshot ID for removal.
+ ///
+ /// \param snapshot_id The snapshot ID whose partition statistics to remove
+ /// \return Reference to this UpdatePartitionStatistics for chaining
+ UpdatePartitionStatistics& RemovePartitionStatistics(int64_t snapshot_id);
+
+ Kind kind() const final { return Kind::kUpdatePartitionStatistics; }
+
+ struct ApplyResult {
+ std::vector<std::pair<int64_t, std::shared_ptr<PartitionStatisticsFile>>>
to_set;
+ std::vector<int64_t> to_remove;
+ };
+
+ Result<ApplyResult> Apply();
+
+ private:
+ explicit UpdatePartitionStatistics(std::shared_ptr<Transaction> transaction);
+
+ std::unordered_map<int64_t, std::shared_ptr<PartitionStatisticsFile>>
+ partition_statistics_to_set_;
+};
+
+} // namespace iceberg