This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 6f1cdfd8 feat: add update partition stats (#538)
6f1cdfd8 is described below

commit 6f1cdfd800c540cb7d4892a8644e2cfde2729301
Author: Feiyang Li <[email protected]>
AuthorDate: Tue Jan 27 13:38:52 2026 +0800

    feat: add update partition stats (#538)
---
 src/iceberg/CMakeLists.txt                         |   1 +
 src/iceberg/json_internal.cc                       |  33 ++++
 src/iceberg/meson.build                            |   1 +
 src/iceberg/table.cc                               |   8 +
 src/iceberg/table.h                                |   5 +
 src/iceberg/table_metadata.cc                      |  45 ++++-
 src/iceberg/table_update.cc                        |  56 +++++++
 src/iceberg/table_update.h                         |  50 ++++++
 src/iceberg/test/CMakeLists.txt                    |   1 +
 src/iceberg/test/json_internal_test.cc             |  37 +++++
 .../test/update_partition_statistics_test.cc       | 181 +++++++++++++++++++++
 src/iceberg/transaction.cc                         |  25 +++
 src/iceberg/transaction.h                          |   5 +
 src/iceberg/type_fwd.h                             |   1 +
 src/iceberg/update/meson.build                     |   1 +
 src/iceberg/update/pending_update.h                |   1 +
 src/iceberg/update/update_partition_statistics.cc  |  78 +++++++++
 src/iceberg/update/update_partition_statistics.h   |  80 +++++++++
 18 files changed, 607 insertions(+), 2 deletions(-)

diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index 359b79e6..d82229ef 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -92,6 +92,7 @@ set(ICEBERG_SOURCES
     update/snapshot_update.cc
     update/update_location.cc
     update/update_partition_spec.cc
+    update/update_partition_statistics.cc
     update/update_properties.cc
     update/update_schema.cc
     update/update_snapshot_reference.cc
diff --git a/src/iceberg/json_internal.cc b/src/iceberg/json_internal.cc
index 056af671..95883a93 100644
--- a/src/iceberg/json_internal.cc
+++ b/src/iceberg/json_internal.cc
@@ -192,6 +192,9 @@ constexpr std::string_view kActionRemoveProperties = 
"remove-properties";
 constexpr std::string_view kActionSetLocation = "set-location";
 constexpr std::string_view kActionSetStatistics = "set-statistics";
 constexpr std::string_view kActionRemoveStatistics = "remove-statistics";
+constexpr std::string_view kActionSetPartitionStatistics = 
"set-partition-statistics";
+constexpr std::string_view kActionRemovePartitionStatistics =
+    "remove-partition-statistics";
 
 // TableUpdate field constants
 constexpr std::string_view kUUID = "uuid";
@@ -1439,6 +1442,24 @@ nlohmann::json ToJson(const TableUpdate& update) {
       json[kSnapshotId] = u.snapshot_id();
       break;
     }
+    case TableUpdate::Kind::kSetPartitionStatistics: {
+      const auto& u =
+          internal::checked_cast<const table::SetPartitionStatistics&>(update);
+      json[kAction] = kActionSetPartitionStatistics;
+      if (u.partition_statistics_file()) {
+        json[kPartitionStatistics] = ToJson(*u.partition_statistics_file());
+      } else {
+        json[kPartitionStatistics] = nlohmann::json::value_t::null;
+      }
+      break;
+    }
+    case TableUpdate::Kind::kRemovePartitionStatistics: {
+      const auto& u =
+          internal::checked_cast<const 
table::RemovePartitionStatistics&>(update);
+      json[kAction] = kActionRemovePartitionStatistics;
+      json[kSnapshotId] = u.snapshot_id();
+      break;
+    }
   }
   return json;
 }
@@ -1628,6 +1649,18 @@ Result<std::unique_ptr<TableUpdate>> 
TableUpdateFromJson(const nlohmann::json& j
     ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue<int64_t>(json, 
kSnapshotId));
     return std::make_unique<table::RemoveStatistics>(snapshot_id);
   }
+  if (action == kActionSetPartitionStatistics) {
+    ICEBERG_ASSIGN_OR_RAISE(auto partition_statistics_json,
+                            GetJsonValue<nlohmann::json>(json, 
kPartitionStatistics));
+    ICEBERG_ASSIGN_OR_RAISE(auto partition_statistics_file,
+                            
PartitionStatisticsFileFromJson(partition_statistics_json));
+    return std::make_unique<table::SetPartitionStatistics>(
+        std::move(partition_statistics_file));
+  }
+  if (action == kActionRemovePartitionStatistics) {
+    ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue<int64_t>(json, 
kSnapshotId));
+    return std::make_unique<table::RemovePartitionStatistics>(snapshot_id);
+  }
 
   return JsonParseError("Unknown table update action: {}", action);
 }
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index 05cb6f8d..651de782 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -110,6 +110,7 @@ iceberg_sources = files(
     'update/snapshot_update.cc',
     'update/update_location.cc',
     'update/update_partition_spec.cc',
+    'update/update_partition_statistics.cc',
     'update/update_properties.cc',
     'update/update_schema.cc',
     'update/update_snapshot_reference.cc',
diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc
index 73acafd7..b6c26ea0 100644
--- a/src/iceberg/table.cc
+++ b/src/iceberg/table.cc
@@ -33,6 +33,7 @@
 #include "iceberg/transaction.h"
 #include "iceberg/update/expire_snapshots.h"
 #include "iceberg/update/update_partition_spec.h"
+#include "iceberg/update/update_partition_statistics.h"
 #include "iceberg/update/update_properties.h"
 #include "iceberg/update/update_schema.h"
 #include "iceberg/update/update_statistics.h"
@@ -214,6 +215,13 @@ Result<std::shared_ptr<UpdateStatistics>> 
Table::NewUpdateStatistics() {
   return transaction->NewUpdateStatistics();
 }
 
+Result<std::shared_ptr<UpdatePartitionStatistics>> 
Table::NewUpdatePartitionStatistics() {
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto transaction, Transaction::Make(shared_from_this(), 
Transaction::Kind::kUpdate,
+                                          /*auto_commit=*/true));
+  return transaction->NewUpdatePartitionStatistics();
+}
+
 Result<std::shared_ptr<StagedTable>> StagedTable::Make(
     TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
     std::string metadata_location, std::shared_ptr<FileIO> io,
diff --git a/src/iceberg/table.h b/src/iceberg/table.h
index 77e9016f..1f3135dd 100644
--- a/src/iceberg/table.h
+++ b/src/iceberg/table.h
@@ -156,6 +156,11 @@ class ICEBERG_EXPORT Table : public 
std::enable_shared_from_this<Table> {
   /// changes.
   virtual Result<std::shared_ptr<UpdateStatistics>> NewUpdateStatistics();
 
+  /// \brief Create a new UpdatePartitionStatistics to update partition 
statistics and
+  /// commit the changes.
+  virtual Result<std::shared_ptr<UpdatePartitionStatistics>>
+  NewUpdatePartitionStatistics();
+
   /// \brief Create a new UpdateLocation to update the table location and 
commit the
   /// changes.
   virtual Result<std::shared_ptr<UpdateLocation>> NewUpdateLocation();
diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc
index 393b438a..7dab6704 100644
--- a/src/iceberg/table_metadata.cc
+++ b/src/iceberg/table_metadata.cc
@@ -623,6 +623,9 @@ class TableMetadataBuilder::Impl {
   Status RemovePartitionSpecs(const std::vector<int32_t>& spec_ids);
   Status SetStatistics(std::shared_ptr<StatisticsFile> statistics_file);
   Status RemoveStatistics(int64_t snapshot_id);
+  Status SetPartitionStatistics(
+      std::shared_ptr<PartitionStatisticsFile> partition_statistics_file);
+  Status RemovePartitionStatistics(int64_t snapshot_id);
 
   Result<std::unique_ptr<TableMetadata>> Build();
 
@@ -1208,6 +1211,41 @@ Status 
TableMetadataBuilder::Impl::RemoveStatistics(int64_t snapshot_id) {
   return {};
 }
 
+Status TableMetadataBuilder::Impl::SetPartitionStatistics(
+    std::shared_ptr<PartitionStatisticsFile> partition_statistics_file) {
+  ICEBERG_PRECHECK(partition_statistics_file != nullptr,
+                   "Cannot set null partition statistics file");
+
+  // Find and replace existing partition statistics for the same snapshot_id, 
or add new
+  // one
+  auto it = std::ranges::find_if(
+      metadata_.partition_statistics,
+      [snapshot_id = partition_statistics_file->snapshot_id](const auto& stat) 
{
+        return stat && stat->snapshot_id == snapshot_id;
+      });
+
+  if (it != metadata_.partition_statistics.end()) {
+    *it = partition_statistics_file;
+  } else {
+    metadata_.partition_statistics.push_back(partition_statistics_file);
+  }
+
+  changes_.push_back(std::make_unique<table::SetPartitionStatistics>(
+      std::move(partition_statistics_file)));
+  return {};
+}
+
+Status TableMetadataBuilder::Impl::RemovePartitionStatistics(int64_t 
snapshot_id) {
+  auto removed_count =
+      std::erase_if(metadata_.partition_statistics, [snapshot_id](const auto& 
stat) {
+        return stat && stat->snapshot_id == snapshot_id;
+      });
+  if (removed_count != 0) {
+    
changes_.push_back(std::make_unique<table::RemovePartitionStatistics>(snapshot_id));
+  }
+  return {};
+}
+
 std::unordered_set<int64_t> 
TableMetadataBuilder::Impl::IntermediateSnapshotIdSet(
     int64_t current_snapshot_id) const {
   std::unordered_set<int64_t> added_snapshot_ids;
@@ -1636,12 +1674,15 @@ TableMetadataBuilder& 
TableMetadataBuilder::RemoveStatistics(int64_t snapshot_id
 
 TableMetadataBuilder& TableMetadataBuilder::SetPartitionStatistics(
     const std::shared_ptr<PartitionStatisticsFile>& partition_statistics_file) 
{
-  throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+  ICEBERG_BUILDER_RETURN_IF_ERROR(
+      impl_->SetPartitionStatistics(partition_statistics_file));
+  return *this;
 }
 
 TableMetadataBuilder& TableMetadataBuilder::RemovePartitionStatistics(
     int64_t snapshot_id) {
-  throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+  
ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->RemovePartitionStatistics(snapshot_id));
+  return *this;
 }
 
 TableMetadataBuilder& TableMetadataBuilder::SetProperties(
diff --git a/src/iceberg/table_update.cc b/src/iceberg/table_update.cc
index 5866b61d..946b2a6a 100644
--- a/src/iceberg/table_update.cc
+++ b/src/iceberg/table_update.cc
@@ -500,4 +500,60 @@ std::unique_ptr<TableUpdate> RemoveStatistics::Clone() 
const {
   return std::make_unique<RemoveStatistics>(snapshot_id_);
 }
 
+// SetPartitionStatistics
+
+int64_t SetPartitionStatistics::snapshot_id() const {
+  return partition_statistics_file_->snapshot_id;
+}
+
+void SetPartitionStatistics::ApplyTo(TableMetadataBuilder& builder) const {
+  builder.SetPartitionStatistics(partition_statistics_file_);
+}
+
+void SetPartitionStatistics::GenerateRequirements(TableUpdateContext& context) 
const {
+  // SetPartitionStatistics doesn't generate any requirements
+}
+
+bool SetPartitionStatistics::Equals(const TableUpdate& other) const {
+  if (other.kind() != Kind::kSetPartitionStatistics) {
+    return false;
+  }
+  const auto& other_set = internal::checked_cast<const 
SetPartitionStatistics&>(other);
+  if (!partition_statistics_file_ != !other_set.partition_statistics_file_) {
+    return false;
+  }
+  if (partition_statistics_file_ &&
+      !(*partition_statistics_file_ == *other_set.partition_statistics_file_)) 
{
+    return false;
+  }
+  return true;
+}
+
+std::unique_ptr<TableUpdate> SetPartitionStatistics::Clone() const {
+  return std::make_unique<SetPartitionStatistics>(partition_statistics_file_);
+}
+
+// RemovePartitionStatistics
+
+void RemovePartitionStatistics::ApplyTo(TableMetadataBuilder& builder) const {
+  builder.RemovePartitionStatistics(snapshot_id_);
+}
+
+void RemovePartitionStatistics::GenerateRequirements(TableUpdateContext& 
context) const {
+  // RemovePartitionStatistics doesn't generate any requirements
+}
+
+bool RemovePartitionStatistics::Equals(const TableUpdate& other) const {
+  if (other.kind() != Kind::kRemovePartitionStatistics) {
+    return false;
+  }
+  const auto& other_remove =
+      internal::checked_cast<const RemovePartitionStatistics&>(other);
+  return snapshot_id_ == other_remove.snapshot_id_;
+}
+
+std::unique_ptr<TableUpdate> RemovePartitionStatistics::Clone() const {
+  return std::make_unique<RemovePartitionStatistics>(snapshot_id_);
+}
+
 }  // namespace iceberg::table
diff --git a/src/iceberg/table_update.h b/src/iceberg/table_update.h
index 5bbc243e..c75c3fa6 100644
--- a/src/iceberg/table_update.h
+++ b/src/iceberg/table_update.h
@@ -61,6 +61,8 @@ class ICEBERG_EXPORT TableUpdate {
     kSetLocation,
     kSetStatistics,
     kRemoveStatistics,
+    kSetPartitionStatistics,
+    kRemovePartitionStatistics,
   };
 
   virtual ~TableUpdate();
@@ -558,6 +560,54 @@ class ICEBERG_EXPORT RemoveStatistics : public TableUpdate 
{
   int64_t snapshot_id_;
 };
 
+/// \brief Represents setting partition statistics for a snapshot
+class ICEBERG_EXPORT SetPartitionStatistics : public TableUpdate {
+ public:
+  explicit SetPartitionStatistics(
+      std::shared_ptr<PartitionStatisticsFile> partition_statistics_file)
+      : partition_statistics_file_(std::move(partition_statistics_file)) {}
+
+  int64_t snapshot_id() const;
+
+  const std::shared_ptr<PartitionStatisticsFile>& partition_statistics_file() 
const {
+    return partition_statistics_file_;
+  }
+
+  void ApplyTo(TableMetadataBuilder& builder) const override;
+
+  void GenerateRequirements(TableUpdateContext& context) const override;
+
+  Kind kind() const override { return Kind::kSetPartitionStatistics; }
+
+  bool Equals(const TableUpdate& other) const override;
+
+  std::unique_ptr<TableUpdate> Clone() const override;
+
+ private:
+  std::shared_ptr<PartitionStatisticsFile> partition_statistics_file_;
+};
+
+/// \brief Represents removing partition statistics for a snapshot
+class ICEBERG_EXPORT RemovePartitionStatistics : public TableUpdate {
+ public:
+  explicit RemovePartitionStatistics(int64_t snapshot_id) : 
snapshot_id_(snapshot_id) {}
+
+  int64_t snapshot_id() const { return snapshot_id_; }
+
+  void ApplyTo(TableMetadataBuilder& builder) const override;
+
+  void GenerateRequirements(TableUpdateContext& context) const override;
+
+  Kind kind() const override { return Kind::kRemovePartitionStatistics; }
+
+  bool Equals(const TableUpdate& other) const override;
+
+  std::unique_ptr<TableUpdate> Clone() const override;
+
+ private:
+  int64_t snapshot_id_;
+};
+
 }  // namespace table
 
 }  // namespace iceberg
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index 5243d9b7..f3c8aea9 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -182,6 +182,7 @@ if(ICEBERG_BUILD_BUNDLE)
                    transaction_test.cc
                    update_location_test.cc
                    update_partition_spec_test.cc
+                   update_partition_statistics_test.cc
                    update_properties_test.cc
                    update_schema_test.cc
                    update_sort_order_test.cc
diff --git a/src/iceberg/test/json_internal_test.cc 
b/src/iceberg/test/json_internal_test.cc
index bb167ad0..8fa24312 100644
--- a/src/iceberg/test/json_internal_test.cc
+++ b/src/iceberg/test/json_internal_test.cc
@@ -613,6 +613,43 @@ TEST(JsonInternalTest, TableUpdateRemoveStatistics) {
             update);
 }
 
+TEST(JsonInternalTest, TableUpdateSetPartitionStatistics) {
+  auto partition_stats_file = std::make_shared<PartitionStatisticsFile>();
+  partition_stats_file->snapshot_id = 123456789;
+  partition_stats_file->path =
+      "s3://bucket/warehouse/table/metadata/partition-stats-123456789.parquet";
+  partition_stats_file->file_size_in_bytes = 2048;
+
+  table::SetPartitionStatistics update(partition_stats_file);
+  nlohmann::json expected = R"({
+    "action": "set-partition-statistics",
+    "partition-statistics": {
+      "snapshot-id": 123456789,
+      "statistics-path": 
"s3://bucket/warehouse/table/metadata/partition-stats-123456789.parquet",
+      "file-size-in-bytes": 2048
+    }
+  })"_json;
+
+  EXPECT_EQ(ToJson(update), expected);
+  auto parsed = TableUpdateFromJson(expected);
+  ASSERT_THAT(parsed, IsOk());
+  
EXPECT_EQ(*internal::checked_cast<table::SetPartitionStatistics*>(parsed.value().get()),
+            update);
+}
+
+TEST(JsonInternalTest, TableUpdateRemovePartitionStatistics) {
+  table::RemovePartitionStatistics update(123456789);
+  nlohmann::json expected =
+      
R"({"action":"remove-partition-statistics","snapshot-id":123456789})"_json;
+
+  EXPECT_EQ(ToJson(update), expected);
+  auto parsed = TableUpdateFromJson(expected);
+  ASSERT_THAT(parsed, IsOk());
+  EXPECT_EQ(
+      
*internal::checked_cast<table::RemovePartitionStatistics*>(parsed.value().get()),
+      update);
+}
+
 TEST(JsonInternalTest, TableUpdateUnknownAction) {
   nlohmann::json json = R"({"action":"unknown-action"})"_json;
   auto result = TableUpdateFromJson(json);
diff --git a/src/iceberg/test/update_partition_statistics_test.cc 
b/src/iceberg/test/update_partition_statistics_test.cc
new file mode 100644
index 00000000..5ed84cc0
--- /dev/null
+++ b/src/iceberg/test/update_partition_statistics_test.cc
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/update_partition_statistics.h"
+
+#include <algorithm>
+#include <memory>
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/result.h"
+#include "iceberg/statistics_file.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/test/update_test_base.h"
+
+namespace iceberg {
+
+class UpdatePartitionStatisticsTest : public UpdateTestBase {
+ protected:
+  // Helper function to create a partition statistics file
+  std::shared_ptr<PartitionStatisticsFile> MakePartitionStatisticsFile(
+      int64_t snapshot_id, const std::string& path, int64_t file_size = 2048) {
+    auto stats_file = std::make_shared<PartitionStatisticsFile>();
+    stats_file->snapshot_id = snapshot_id;
+    stats_file->path = path;
+    stats_file->file_size_in_bytes = file_size;
+    return stats_file;
+  }
+
+  // Helper to find partition statistics file by snapshot_id in the result 
vector
+  std::shared_ptr<PartitionStatisticsFile> FindPartitionStatistics(
+      const std::vector<std::pair<int64_t, 
std::shared_ptr<PartitionStatisticsFile>>>&
+          to_set,
+      int64_t snapshot_id) {
+    auto it = std::ranges::find_if(
+        to_set, [snapshot_id](const auto& p) { return p.first == snapshot_id; 
});
+    return it != to_set.end() ? it->second : nullptr;
+  }
+};
+
+TEST_F(UpdatePartitionStatisticsTest, EmptyUpdate) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdatePartitionStatistics());
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  EXPECT_TRUE(result.to_set.empty());
+  EXPECT_TRUE(result.to_remove.empty());
+}
+
+TEST_F(UpdatePartitionStatisticsTest, SetPartitionStatistics) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdatePartitionStatistics());
+  auto partition_stats_file = MakePartitionStatisticsFile(
+      1, "/warehouse/test_table/metadata/partition-stats-1.parquet");
+  update->SetPartitionStatistics(partition_stats_file);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  EXPECT_EQ(result.to_set.size(), 1);
+  EXPECT_TRUE(result.to_remove.empty());
+
+  auto found = FindPartitionStatistics(result.to_set, 1);
+  ASSERT_NE(found, nullptr);
+  EXPECT_EQ(found->snapshot_id, 1);
+  EXPECT_EQ(found->path, 
"/warehouse/test_table/metadata/partition-stats-1.parquet");
+  EXPECT_EQ(found->file_size_in_bytes, 2048);
+}
+
+TEST_F(UpdatePartitionStatisticsTest, SetMultiplePartitionStatistics) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdatePartitionStatistics());
+
+  auto partition_stats_file1 = MakePartitionStatisticsFile(
+      1, "/warehouse/test_table/metadata/partition-stats-1.parquet");
+  auto partition_stats_file2 = MakePartitionStatisticsFile(
+      2, "/warehouse/test_table/metadata/partition-stats-2.parquet", 4096);
+
+  update->SetPartitionStatistics(partition_stats_file1);
+  update->SetPartitionStatistics(partition_stats_file2);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  EXPECT_EQ(result.to_set.size(), 2);
+  EXPECT_TRUE(result.to_remove.empty());
+
+  auto found1 = FindPartitionStatistics(result.to_set, 1);
+  ASSERT_NE(found1, nullptr);
+  EXPECT_EQ(found1->snapshot_id, 1);
+
+  auto found2 = FindPartitionStatistics(result.to_set, 2);
+  ASSERT_NE(found2, nullptr);
+  EXPECT_EQ(found2->snapshot_id, 2);
+  EXPECT_EQ(found2->file_size_in_bytes, 4096);
+}
+
+TEST_F(UpdatePartitionStatisticsTest, ReplacePartitionStatistics) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdatePartitionStatistics());
+
+  auto partition_stats_file1 = MakePartitionStatisticsFile(
+      1, "/warehouse/test_table/metadata/partition-stats-1.parquet");
+  auto partition_stats_file2 = MakePartitionStatisticsFile(
+      1, "/warehouse/test_table/metadata/partition-stats-1-updated.parquet", 
8192);
+
+  update->SetPartitionStatistics(partition_stats_file1);
+  update->SetPartitionStatistics(partition_stats_file2);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  EXPECT_EQ(result.to_set.size(), 1);
+  EXPECT_TRUE(result.to_remove.empty());
+
+  auto found = FindPartitionStatistics(result.to_set, 1);
+  ASSERT_NE(found, nullptr);
+  EXPECT_EQ(found->path,
+            
"/warehouse/test_table/metadata/partition-stats-1-updated.parquet");
+  EXPECT_EQ(found->file_size_in_bytes, 8192);
+}
+
+TEST_F(UpdatePartitionStatisticsTest, RemovePartitionStatistics) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdatePartitionStatistics());
+  update->RemovePartitionStatistics(1);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  EXPECT_TRUE(result.to_set.empty());
+  EXPECT_EQ(result.to_remove.size(), 1);
+  EXPECT_EQ(result.to_remove[0], 1);
+}
+
+TEST_F(UpdatePartitionStatisticsTest, SetThenRemovePartitionStatistics) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdatePartitionStatistics());
+
+  auto partition_stats_file = MakePartitionStatisticsFile(
+      1, "/warehouse/test_table/metadata/partition-stats-1.parquet");
+  update->SetPartitionStatistics(partition_stats_file);
+  update->RemovePartitionStatistics(1);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  EXPECT_TRUE(result.to_set.empty());
+  EXPECT_EQ(result.to_remove.size(), 1);
+  EXPECT_EQ(result.to_remove[0], 1);
+}
+
+TEST_F(UpdatePartitionStatisticsTest, SetNullPartitionStatistics) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdatePartitionStatistics());
+
+  update->SetPartitionStatistics(nullptr);
+
+  auto result = update->Apply();
+  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result, HasErrorMessage("Statistics file cannot be null"));
+}
+
+TEST_F(UpdatePartitionStatisticsTest, SetAndRemoveMixed) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdatePartitionStatistics());
+
+  auto partition_stats_file1 = MakePartitionStatisticsFile(
+      1, "/warehouse/test_table/metadata/partition-stats-1.parquet");
+  auto partition_stats_file2 = MakePartitionStatisticsFile(
+      2, "/warehouse/test_table/metadata/partition-stats-2.parquet");
+
+  update->SetPartitionStatistics(partition_stats_file1);
+  update->SetPartitionStatistics(partition_stats_file2);
+  update->RemovePartitionStatistics(3);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  EXPECT_EQ(result.to_set.size(), 2);
+  EXPECT_EQ(result.to_remove.size(), 1);
+  EXPECT_EQ(result.to_remove[0], 3);
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc
index 04ccdfb9..b24aa0da 100644
--- a/src/iceberg/transaction.cc
+++ b/src/iceberg/transaction.cc
@@ -39,6 +39,7 @@
 #include "iceberg/update/snapshot_update.h"
 #include "iceberg/update/update_location.h"
 #include "iceberg/update/update_partition_spec.h"
+#include "iceberg/update/update_partition_statistics.h"
 #include "iceberg/update/update_properties.h"
 #include "iceberg/update/update_schema.h"
 #include "iceberg/update/update_snapshot_reference.h"
@@ -142,6 +143,10 @@ Status Transaction::Apply(PendingUpdate& update) {
       ICEBERG_RETURN_UNEXPECTED(
           
ApplyUpdateStatistics(internal::checked_cast<UpdateStatistics&>(update)));
       break;
+    case PendingUpdate::Kind::kUpdatePartitionStatistics:
+      ICEBERG_RETURN_UNEXPECTED(ApplyUpdatePartitionStatistics(
+          internal::checked_cast<UpdatePartitionStatistics&>(update)));
+      break;
     default:
       return NotSupported("Unsupported pending update: {}",
                           static_cast<int32_t>(update.kind()));
@@ -284,6 +289,17 @@ Status 
Transaction::ApplyUpdateStatistics(UpdateStatistics& update) {
   return {};
 }
 
+Status Transaction::ApplyUpdatePartitionStatistics(UpdatePartitionStatistics& 
update) {
+  ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
+  for (auto&& [_, partition_stat_file] : result.to_set) {
+    metadata_builder_->SetPartitionStatistics(std::move(partition_stat_file));
+  }
+  for (const auto& snapshot_id : result.to_remove) {
+    metadata_builder_->RemovePartitionStatistics(snapshot_id);
+  }
+  return {};
+}
+
 Result<std::shared_ptr<Table>> Transaction::Commit() {
   if (committed_) {
     return Invalid("Transaction already committed");
@@ -395,6 +411,15 @@ Result<std::shared_ptr<UpdateStatistics>> 
Transaction::NewUpdateStatistics() {
   return update_statistics;
 }
 
+Result<std::shared_ptr<UpdatePartitionStatistics>>
+Transaction::NewUpdatePartitionStatistics() {
+  ICEBERG_ASSIGN_OR_RAISE(
+      std::shared_ptr<UpdatePartitionStatistics> update_partition_statistics,
+      UpdatePartitionStatistics::Make(shared_from_this()));
+  ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_partition_statistics));
+  return update_partition_statistics;
+}
+
 Result<std::shared_ptr<UpdateSnapshotReference>>
 Transaction::NewUpdateSnapshotReference() {
   ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<UpdateSnapshotReference> update_ref,
diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h
index 3d5450e5..e975be7f 100644
--- a/src/iceberg/transaction.h
+++ b/src/iceberg/transaction.h
@@ -86,6 +86,10 @@ class ICEBERG_EXPORT Transaction : public 
std::enable_shared_from_this<Transacti
   /// changes.
   Result<std::shared_ptr<UpdateStatistics>> NewUpdateStatistics();
 
+  /// \brief Create a new UpdatePartitionStatistics to update partition 
statistics and
+  /// commit the changes.
+  Result<std::shared_ptr<UpdatePartitionStatistics>> 
NewUpdatePartitionStatistics();
+
   /// \brief Create a new UpdateLocation to update the table location and 
commit the
   /// changes.
   Result<std::shared_ptr<UpdateLocation>> NewUpdateLocation();
@@ -115,6 +119,7 @@ class ICEBERG_EXPORT Transaction : public 
std::enable_shared_from_this<Transacti
   Status ApplySetSnapshot(SetSnapshot& update);
   Status ApplyUpdateLocation(UpdateLocation& update);
   Status ApplyUpdatePartitionSpec(UpdatePartitionSpec& update);
+  Status ApplyUpdatePartitionStatistics(UpdatePartitionStatistics& update);
   Status ApplyUpdateProperties(UpdateProperties& update);
   Status ApplyUpdateSchema(UpdateSchema& update);
   Status ApplyUpdateSnapshot(SnapshotUpdate& update);
diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h
index 9e21088f..e97de0ac 100644
--- a/src/iceberg/type_fwd.h
+++ b/src/iceberg/type_fwd.h
@@ -196,6 +196,7 @@ class SetSnapshot;
 class SnapshotUpdate;
 class UpdateLocation;
 class UpdatePartitionSpec;
+class UpdatePartitionStatistics;
 class UpdateProperties;
 class UpdateSchema;
 class UpdateSnapshotReference;
diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build
index e00b1e6e..102471c0 100644
--- a/src/iceberg/update/meson.build
+++ b/src/iceberg/update/meson.build
@@ -24,6 +24,7 @@ install_headers(
         'snapshot_update.h',
         'update_location.h',
         'update_partition_spec.h',
+        'update_partition_statistics.h',
         'update_schema.h',
         'update_snapshot_reference.h',
         'update_sort_order.h',
diff --git a/src/iceberg/update/pending_update.h 
b/src/iceberg/update/pending_update.h
index e5d583d1..f44812a8 100644
--- a/src/iceberg/update/pending_update.h
+++ b/src/iceberg/update/pending_update.h
@@ -46,6 +46,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
     kSetSnapshot,
     kUpdateLocation,
     kUpdatePartitionSpec,
+    kUpdatePartitionStatistics,
     kUpdateProperties,
     kUpdateSchema,
     kUpdateSnapshot,
diff --git a/src/iceberg/update/update_partition_statistics.cc 
b/src/iceberg/update/update_partition_statistics.cc
new file mode 100644
index 00000000..2c06c0ce
--- /dev/null
+++ b/src/iceberg/update/update_partition_statistics.cc
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/update_partition_statistics.h"
+
+#include <cstdint>
+#include <memory>
+#include <unordered_map>
+#include <vector>
+
+#include "iceberg/result.h"
+#include "iceberg/statistics_file.h"
+#include "iceberg/transaction.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+Result<std::shared_ptr<UpdatePartitionStatistics>> 
UpdatePartitionStatistics::Make(
+    std::shared_ptr<Transaction> transaction) {
+  ICEBERG_PRECHECK(transaction != nullptr,
+                   "Cannot create UpdatePartitionStatistics without a 
transaction");
+  return std::shared_ptr<UpdatePartitionStatistics>(
+      new UpdatePartitionStatistics(std::move(transaction)));
+}
+
+UpdatePartitionStatistics::UpdatePartitionStatistics(
+    std::shared_ptr<Transaction> transaction)
+    : PendingUpdate(std::move(transaction)) {}
+
+UpdatePartitionStatistics::~UpdatePartitionStatistics() = default;
+
+UpdatePartitionStatistics& UpdatePartitionStatistics::SetPartitionStatistics(
+    std::shared_ptr<PartitionStatisticsFile> partition_statistics_file) {
+  ICEBERG_BUILDER_CHECK(partition_statistics_file != nullptr,
+                        "Statistics file cannot be null");
+
+  partition_statistics_to_set_[partition_statistics_file->snapshot_id] =
+      std::move(partition_statistics_file);
+  return *this;
+}
+
+UpdatePartitionStatistics& 
UpdatePartitionStatistics::RemovePartitionStatistics(
+    int64_t snapshot_id) {
+  partition_statistics_to_set_[snapshot_id] = nullptr;
+  return *this;
+}
+
+Result<UpdatePartitionStatistics::ApplyResult> 
UpdatePartitionStatistics::Apply() {
+  ICEBERG_RETURN_UNEXPECTED(CheckErrors());
+
+  ApplyResult result;
+  for (const auto& [snapshot_id, partition_stats] : 
partition_statistics_to_set_) {
+    if (partition_stats) {
+      result.to_set.emplace_back(snapshot_id, partition_stats);
+    } else {
+      result.to_remove.push_back(snapshot_id);
+    }
+  }
+  return result;
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/update/update_partition_statistics.h 
b/src/iceberg/update/update_partition_statistics.h
new file mode 100644
index 00000000..740fe214
--- /dev/null
+++ b/src/iceberg/update/update_partition_statistics.h
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "iceberg/iceberg_export.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+#include "iceberg/update/pending_update.h"
+
+/// \file iceberg/update/update_partition_statistics.h
+/// \brief Updates table partition statistics.
+
+namespace iceberg {
+
+/// \brief Updates table partition statistics.
+class ICEBERG_EXPORT UpdatePartitionStatistics : public PendingUpdate {
+ public:
+  static Result<std::shared_ptr<UpdatePartitionStatistics>> Make(
+      std::shared_ptr<Transaction> transaction);
+
+  ~UpdatePartitionStatistics() override;
+
+  /// \brief Set partition statistics file for a snapshot.
+  ///
+  /// Associates a partition statistics file with a snapshot ID. If partition 
statistics
+  /// already exist for this snapshot, they will be replaced.
+  ///
+  /// \param partition_statistics_file The partition statistics file to set
+  /// \return Reference to this UpdatePartitionStatistics for chaining
+  UpdatePartitionStatistics& SetPartitionStatistics(
+      std::shared_ptr<PartitionStatisticsFile> partition_statistics_file);
+
+  /// \brief Remove partition statistics for a snapshot.
+  ///
+  /// Marks the partition statistics for the given snapshot ID for removal.
+  ///
+  /// \param snapshot_id The snapshot ID whose partition statistics to remove
+  /// \return Reference to this UpdatePartitionStatistics for chaining
+  UpdatePartitionStatistics& RemovePartitionStatistics(int64_t snapshot_id);
+
+  Kind kind() const final { return Kind::kUpdatePartitionStatistics; }
+
+  struct ApplyResult {
+    std::vector<std::pair<int64_t, std::shared_ptr<PartitionStatisticsFile>>> 
to_set;
+    std::vector<int64_t> to_remove;
+  };
+
+  Result<ApplyResult> Apply();
+
+ private:
+  explicit UpdatePartitionStatistics(std::shared_ptr<Transaction> transaction);
+
+  std::unordered_map<int64_t, std::shared_ptr<PartitionStatisticsFile>>
+      partition_statistics_to_set_;
+};
+
+}  // namespace iceberg


Reply via email to