This is an automated email from the ASF dual-hosted git repository.

wgtmac pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 97ea8708 feat: metrics for parquet writer (#651)
97ea8708 is described below

commit 97ea8708d216a554b550d86510437fb91027cb20
Author: wzhuo <[email protected]>
AuthorDate: Tue May 26 12:55:45 2026 +0800

    feat: metrics for parquet writer (#651)
---
 src/iceberg/CMakeLists.txt               |    1 +
 src/iceberg/file_writer.h                |    3 +
 src/iceberg/metrics.h                    |   29 +
 src/iceberg/metrics_config.cc            |   22 +
 src/iceberg/metrics_config.h             |   11 +
 src/iceberg/parquet/parquet_metrics.cc   |  370 +++++++++++
 src/iceberg/parquet/parquet_metrics.h    |   64 ++
 src/iceberg/parquet/parquet_writer.cc    |   40 +-
 src/iceberg/test/CMakeLists.txt          |    2 +
 src/iceberg/test/metrics_test_base.cc    | 1020 ++++++++++++++++++++++++++++++
 src/iceberg/test/metrics_test_base.h     |  138 ++++
 src/iceberg/test/parquet_metrics_test.cc |   88 +++
 src/iceberg/test/truncate_util_test.cc   |   10 +-
 src/iceberg/util/truncate_util.cc        |   63 +-
 src/iceberg/util/truncate_util.h         |   43 +-
 15 files changed, 1865 insertions(+), 39 deletions(-)

diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index 2b02b999..672cf54a 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -239,6 +239,7 @@ if(ICEBERG_BUILD_BUNDLE)
       avro/avro_schema_util.cc
       avro/avro_stream_internal.cc
       parquet/parquet_data_util.cc
+      parquet/parquet_metrics.cc
       parquet/parquet_reader.cc
       parquet/parquet_register.cc
       parquet/parquet_schema_util.cc
diff --git a/src/iceberg/file_writer.h b/src/iceberg/file_writer.h
index a49b5228..3c890453 100644
--- a/src/iceberg/file_writer.h
+++ b/src/iceberg/file_writer.h
@@ -30,6 +30,7 @@
 #include "iceberg/arrow_c_data.h"
 #include "iceberg/file_format.h"
 #include "iceberg/metrics.h"
+#include "iceberg/metrics_config.h"
 #include "iceberg/result.h"
 #include "iceberg/type_fwd.h"
 #include "iceberg/util/config.h"
@@ -77,6 +78,8 @@ struct ICEBERG_EXPORT WriterOptions {
   std::shared_ptr<class FileIO> io;
   /// \brief Metadata to write to the file.
   std::unordered_map<std::string, std::string> metadata;
+  /// \brief Metrics configuration.
+  std::shared_ptr<MetricsConfig> metrics_config = MetricsConfig::Default();
   /// \brief Format-specific or implementation-specific properties.
   WriterProperties properties;
 };
diff --git a/src/iceberg/metrics.h b/src/iceberg/metrics.h
index b476a475..083cd041 100644
--- a/src/iceberg/metrics.h
+++ b/src/iceberg/metrics.h
@@ -30,6 +30,35 @@
 
 namespace iceberg {
 
+/// \brief Field-level metrics for a single column.
+///
+/// This structure captures value counts, null counts, NaN counts, and optional
+/// lower/upper bounds for a specific field identified by its field_id.
+struct ICEBERG_EXPORT FieldMetrics {
+  /// \brief The field ID this metrics belongs to.
+  int32_t field_id;
+
+  /// \brief The total number of values (including nulls) for this field.
+  /// A negative value indicates the count is unknown.
+  int64_t value_count = -1;
+
+  /// \brief The number of null values for this field.
+  /// A negative value indicates the count is unknown.
+  int64_t null_value_count = -1;
+
+  /// \brief The number of NaN values for this field.
+  /// A negative value indicates the count is unknown.
+  int64_t nan_value_count = -1;
+
+  /// \brief The lower bound value as a Literal.
+  /// Empty if no lower bound is available.
+  std::optional<Literal> lower_bound = std::nullopt;
+
+  /// \brief The upper bound value as a Literal.
+  /// Empty if no upper bound is available.
+  std::optional<Literal> upper_bound = std::nullopt;
+};
+
 /// \brief Iceberg file format metrics
 struct ICEBERG_EXPORT Metrics {
   std::optional<int64_t> row_count;
diff --git a/src/iceberg/metrics_config.cc b/src/iceberg/metrics_config.cc
index e378640e..ea20d47e 100644
--- a/src/iceberg/metrics_config.cc
+++ b/src/iceberg/metrics_config.cc
@@ -19,6 +19,7 @@
 
 #include "iceberg/metrics_config.h"
 
+#include <limits>
 #include <string>
 #include <unordered_map>
 
@@ -100,6 +101,19 @@ Result<MetricsMode> 
MetricsMode::FromString(std::string_view mode) {
   return InvalidArgument("Invalid metrics mode: {}", mode);
 }
 
+int32_t MetricsMode::TruncateLength() const {
+  switch (kind) {
+    case Kind::kNone:
+    case Kind::kCounts:
+      return 0;
+    case Kind::kTruncate:
+      return std::get<int32_t>(length);
+    case Kind::kFull:
+      return std::numeric_limits<int32_t>::max();
+  }
+  return 0;
+}
+
 MetricsConfig::MetricsConfig(ColumnModeMap column_modes, MetricsMode 
default_mode)
     : column_modes_(std::move(column_modes)), default_mode_(default_mode) {}
 
@@ -116,6 +130,14 @@ Result<std::shared_ptr<MetricsConfig>> 
MetricsConfig::Make(const Table& table) {
                       *sort_order.value_or(SortOrder::Unsorted()));
 }
 
+Result<std::shared_ptr<MetricsConfig>> MetricsConfig::Make(
+    std::unordered_map<std::string, std::string> properties) {
+  // Create a minimal TableProperties wrapper for the properties
+  TableProperties props = TableProperties::FromMap(std::move(properties));
+
+  return MakeInternal(props, Schema({}), *SortOrder::Unsorted());
+}
+
 Result<std::shared_ptr<MetricsConfig>> MetricsConfig::MakeInternal(
     const TableProperties& props, const Schema& schema, const SortOrder& 
order) {
   ColumnModeMap column_modes;
diff --git a/src/iceberg/metrics_config.h b/src/iceberg/metrics_config.h
index 7a49e906..a5e51ee6 100644
--- a/src/iceberg/metrics_config.h
+++ b/src/iceberg/metrics_config.h
@@ -52,6 +52,11 @@ struct ICEBERG_EXPORT MetricsMode {
 
   Kind kind;
   std::variant<std::monostate, int32_t> length;
+
+  /// \brief Get the truncate length from this MetricsMode.
+  /// \return 0 for None/Counts modes, the truncate length for Truncate mode,
+  ///         or INT_MAX for Full mode.
+  int32_t TruncateLength() const;
 };
 
 /// \brief Configuration for collecting column metrics for an Iceberg table.
@@ -63,6 +68,12 @@ class ICEBERG_EXPORT MetricsConfig {
   /// \brief Creates a metrics config from a table.
   static Result<std::shared_ptr<MetricsConfig>> Make(const Table& table);
 
+  /// \brief Creates a metrics config from properties (for testing)
+  /// \param properties Map of property key-value pairs
+  /// \return A shared pointer to the created MetricsConfig
+  static Result<std::shared_ptr<MetricsConfig>> Make(
+      std::unordered_map<std::string, std::string> properties);
+
   /// \brief Get `limit` num of primitive field ids from schema
   static Result<std::unordered_set<int32_t>> LimitFieldIds(const Schema& 
schema,
                                                            int32_t limit);
diff --git a/src/iceberg/parquet/parquet_metrics.cc 
b/src/iceberg/parquet/parquet_metrics.cc
new file mode 100644
index 00000000..09b727a1
--- /dev/null
+++ b/src/iceberg/parquet/parquet_metrics.cc
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/parquet/parquet_metrics.h"
+
+#include <limits>
+#include <optional>
+#include <ranges>
+#include <string>
+#include <unordered_map>
+
+#include <parquet/column_reader.h>
+#include <parquet/schema.h>
+#include <parquet/statistics.h>
+#include <parquet/types.h>
+
+#include "iceberg/expression/literal.h"
+#include "iceberg/result.h"
+#include "iceberg/schema.h"
+#include "iceberg/type.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/conversions.h"
+#include "iceberg/util/truncate_util.h"
+#include "iceberg/util/visit_type.h"
+
+namespace iceberg::parquet {
+
+namespace {
+
+/// \brief Get the Iceberg field ID from a Parquet column descriptor.
+/// \return The field ID, or nullopt if no field ID is set.
+std::optional<int32_t> GetFieldId(const ::parquet::ColumnDescriptor& column) {
+  const auto& node = column.schema_node();
+  if (node == nullptr || !node->is_primitive()) {
+    return std::nullopt;
+  }
+  if (node->field_id() < 0) {
+    return std::nullopt;
+  }
+  return node->field_id();
+}
+
+/// \brief Find the column index for a field in the Parquet schema.
+std::optional<int32_t> FindColumnIndex(const ::parquet::SchemaDescriptor& 
parquet_schema,
+                                       int32_t field_id) {
+  auto columns = std::views::iota(0, parquet_schema.num_columns());
+  auto it = std::ranges::find_if(columns, [&](int i) {
+    auto column_field_id = GetFieldId(*parquet_schema.Column(i));
+    return column_field_id.has_value() && column_field_id.value() == field_id;
+  });
+  return it != columns.end() ? std::optional(*it) : std::nullopt;
+}
+
+/// \brief Collect counts (value count and null count) from footer statistics.
+/// \param field_id The Iceberg field ID.
+/// \param metadata The Parquet file metadata.
+/// \param column_idx The column index in the Parquet schema.
+/// \return A pair of (value_count, null_count), or nullopt if stats are not 
available.
+std::optional<FieldMetrics> CollectCounts(int32_t field_id,
+                                          const ::parquet::FileMetaData& 
metadata,
+                                          int32_t column_idx) {
+  int64_t value_count = 0;
+  int64_t null_count = 0;
+
+  for (int rg = 0; rg < metadata.num_row_groups(); ++rg) {
+    auto row_group = metadata.RowGroup(rg);
+    auto column_chunk = row_group->ColumnChunk(column_idx);
+    auto stats = column_chunk->statistics();
+    if (stats == nullptr || !stats->HasNullCount()) {
+      return std::nullopt;
+    }
+
+    null_count += stats->null_count();
+    value_count += column_chunk->num_values();
+  }
+
+  return FieldMetrics{
+      .field_id = field_id, .value_count = value_count, .null_value_count = 
null_count};
+}
+
+/// \brief Collect bounds (lower and upper) from footer statistics.
+/// \param field_id The Iceberg field ID.
+/// \param iceberg_type The Iceberg primitive type for deserializing values.
+/// \param metadata The Parquet file metadata.
+/// \param column_idx The column index in the Parquet schema.
+/// \param truncate_length The length to truncate strings/binary values.
+/// \return FieldMetrics with counts and bounds, or nullopt if stats are not 
available.
+Result<std::optional<FieldMetrics>> CollectBounds(
+    int32_t field_id, std::shared_ptr<PrimitiveType> iceberg_type,
+    const ::parquet::FileMetaData& metadata, int32_t column_idx,
+    int32_t truncate_length) {
+  int64_t null_count = 0;
+  int64_t value_count = 0;
+  std::optional<Literal> lower_bound;
+  std::optional<Literal> upper_bound;
+
+  for (int32_t rg = 0; rg < metadata.num_row_groups(); ++rg) {
+    auto row_group = metadata.RowGroup(rg);
+    auto column_chunk = row_group->ColumnChunk(column_idx);
+    auto stats = column_chunk->statistics();
+    if (stats == nullptr || !stats->HasNullCount()) {
+      return std::nullopt;
+    }
+
+    null_count += stats->null_count();
+    value_count += column_chunk->num_values();
+
+    if (stats->HasMinMax()) {
+      auto min_bytes = stats->EncodeMin();
+      auto min_span = std::span<const uint8_t>(
+          reinterpret_cast<const uint8_t*>(min_bytes.data()), 
min_bytes.size());
+      ICEBERG_ASSIGN_OR_RAISE(auto min_value,
+                              Conversions::FromBytes(iceberg_type, min_span));
+      if (!lower_bound.has_value() || min_value < lower_bound.value()) {
+        lower_bound = std::move(min_value);
+      }
+
+      auto max_bytes = stats->EncodeMax();
+      auto max_span = std::span<const uint8_t>(
+          reinterpret_cast<const uint8_t*>(max_bytes.data()), 
max_bytes.size());
+      ICEBERG_ASSIGN_OR_RAISE(auto max_value,
+                              Conversions::FromBytes(iceberg_type, max_span));
+      if (!upper_bound.has_value() || max_value > upper_bound.value()) {
+        upper_bound = std::move(max_value);
+      }
+    }
+  }
+
+  if (!lower_bound.has_value() || !upper_bound.has_value() || 
lower_bound->IsNaN() ||
+      upper_bound->IsNaN()) {
+    return FieldMetrics{
+        .field_id = field_id,
+        .value_count = value_count,
+        .null_value_count = null_count,
+    };
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(auto truncated_lower,
+                          TruncateUtils::TruncateLowerBound(
+                              *iceberg_type, lower_bound.value(), 
truncate_length));
+  ICEBERG_ASSIGN_OR_RAISE(auto truncated_upper,
+                          TruncateUtils::TruncateUpperBound(
+                              *iceberg_type, upper_bound.value(), 
truncate_length));
+
+  return FieldMetrics{
+      .field_id = field_id,
+      .value_count = value_count,
+      .null_value_count = null_count,
+      .lower_bound = std::move(truncated_lower),
+      .upper_bound = std::move(truncated_upper),
+  };
+}
+
+/// \brief Process pre-computed field metrics, applying truncation if needed.
+/// \param field_id The field ID to look up.
+/// \param field_metrics The map of pre-computed field metrics.
+/// \param primitive_type The primitive type for truncation.
+/// \param truncate_length The truncation length (0 means no bounds).
+/// \return Processed FieldMetrics with truncated bounds if applicable.
+Result<std::optional<FieldMetrics>> MetricsFromFieldMetrics(
+    int32_t field_id, const std::unordered_map<int32_t, FieldMetrics>& 
field_metrics,
+    std::shared_ptr<PrimitiveType> primitive_type, int32_t truncate_length) {
+  auto it = field_metrics.find(field_id);
+  if (it == field_metrics.end()) {
+    return std::nullopt;
+  }
+
+  const auto& fm = it->second;
+  FieldMetrics result{.field_id = fm.field_id,
+                      .value_count = fm.value_count,
+                      .null_value_count = fm.null_value_count,
+                      .nan_value_count = fm.nan_value_count};
+
+  if (truncate_length > 0) {
+    if (fm.lower_bound.has_value()) {
+      ICEBERG_ASSIGN_OR_RAISE(
+          auto lower, TruncateUtils::TruncateLowerBound(
+                          *primitive_type, fm.lower_bound.value(), 
truncate_length));
+      result.lower_bound = std::move(lower);
+    }
+    if (fm.upper_bound.has_value()) {
+      ICEBERG_ASSIGN_OR_RAISE(
+          auto upper, TruncateUtils::TruncateUpperBound(
+                          *primitive_type, fm.upper_bound.value(), 
truncate_length));
+      result.upper_bound = std::move(upper);
+    }
+  }
+
+  return result;
+}
+
+/// \brief Collect metrics for a single primitive field from footer statistics.
+Result<std::optional<FieldMetrics>> MetricsFromFooter(
+    int32_t field_id, std::shared_ptr<PrimitiveType> iceberg_type,
+    const ::parquet::SchemaDescriptor& parquet_schema,
+    const ::parquet::FileMetaData& metadata, int32_t truncate_length) {
+  auto column_idx = FindColumnIndex(parquet_schema, field_id);
+  if (!column_idx.has_value()) {
+    return std::nullopt;
+  }
+
+  auto column_desc = parquet_schema.Column(column_idx.value());
+  if (column_desc->physical_type() == ::parquet::Type::INT96) {
+    return std::nullopt;
+  }
+
+  if (truncate_length <= 0) {
+    return CollectCounts(field_id, metadata, column_idx.value());
+  }
+
+  return CollectBounds(field_id, iceberg_type, metadata, column_idx.value(),
+                       truncate_length);
+}
+
+/// \brief Visitor for collecting metrics from all primitive fields in a 
schema.
+class CollectMetricsVisitor {
+ public:
+  CollectMetricsVisitor(const ::parquet::SchemaDescriptor& parquet_schema,
+                        const MetricsConfig& metrics_config,
+                        const ::parquet::FileMetaData& metadata,
+                        const std::unordered_map<int32_t, FieldMetrics>& 
field_metrics,
+                        Metrics& metrics)
+      : parquet_schema_(parquet_schema),
+        metrics_config_(metrics_config),
+        metadata_(metadata),
+        field_metrics_(field_metrics),
+        metrics_(metrics) {}
+
+  Status VisitStruct(const StructType& type, const std::string& prefix) {
+    for (const auto& field : type.fields()) {
+      std::string full_name = prefix.empty() ? std::string(field.name())
+                                             : prefix + "." + 
std::string(field.name());
+      ICEBERG_RETURN_UNEXPECTED(VisitField(field, full_name));
+    }
+    return {};
+  }
+
+  Status VisitList(const ListType& /*type*/, const std::string& /*prefix*/) { 
return {}; }
+
+  Status VisitMap(const MapType& /*type*/, const std::string& /*prefix*/) { 
return {}; }
+
+  Status VisitPrimitive(const PrimitiveType& /*type*/, const std::string& 
/*prefix*/) {
+    return {};
+  }
+
+ private:
+  Status VisitField(const SchemaField& field, const std::string& full_name) {
+    if (field.type()->is_primitive()) {
+      return ProcessPrimitiveField(field, full_name);
+    } else if (field.type()->is_nested()) {
+      return VisitTypeCategory(*field.type(), this, full_name);
+    }
+    return {};
+  }
+
+  Status ProcessPrimitiveField(const SchemaField& field, const std::string& 
full_name) {
+    int32_t field_id = field.field_id();
+    MetricsMode mode = metrics_config_.ColumnMode(full_name);
+    if (mode.kind == MetricsMode::Kind::kNone) {
+      return {};
+    }
+
+    int32_t truncate_length = mode.TruncateLength();
+    const auto& primitive_type =
+        internal::checked_pointer_cast<PrimitiveType>(field.type());
+
+    ICEBERG_ASSIGN_OR_RAISE(auto field_metrics,
+                            MetricsFromFieldMetrics(field_id, field_metrics_,
+                                                    primitive_type, 
truncate_length));
+    if (field_metrics.has_value()) {
+      ApplyFieldMetrics(field_id, std::move(field_metrics.value()));
+      return {};
+    }
+
+    ICEBERG_ASSIGN_OR_RAISE(auto footer_metrics,
+                            MetricsFromFooter(field_id, primitive_type, 
parquet_schema_,
+                                              metadata_, truncate_length));
+    if (footer_metrics.has_value()) {
+      ApplyFieldMetrics(field_id, std::move(footer_metrics.value()));
+    }
+    return {};
+  }
+
+  void ApplyFieldMetrics(int32_t field_id, FieldMetrics&& fm) {
+    if (fm.value_count >= 0) {
+      metrics_.value_counts[field_id] = fm.value_count;
+    }
+    if (fm.null_value_count >= 0) {
+      metrics_.null_value_counts[field_id] = fm.null_value_count;
+    }
+    if (fm.nan_value_count >= 0) {
+      metrics_.nan_value_counts[field_id] = fm.nan_value_count;
+    }
+    if (fm.lower_bound.has_value()) {
+      metrics_.lower_bounds.emplace(field_id, 
std::move(fm.lower_bound.value()));
+    }
+    if (fm.upper_bound.has_value()) {
+      metrics_.upper_bounds.emplace(field_id, 
std::move(fm.upper_bound.value()));
+    }
+  }
+
+  const ::parquet::SchemaDescriptor& parquet_schema_;
+  const MetricsConfig& metrics_config_;
+  const ::parquet::FileMetaData& metadata_;
+  const std::unordered_map<int32_t, FieldMetrics>& field_metrics_;
+  Metrics& metrics_;
+};
+
+}  // namespace
+
+Result<Metrics> ParquetMetrics::GetMetrics(
+    const Schema& schema, const ::parquet::SchemaDescriptor& parquet_schema,
+    const MetricsConfig& metrics_config, const ::parquet::FileMetaData& 
metadata,
+    const std::unordered_map<int32_t, FieldMetrics>& field_metrics) {
+  Metrics metrics;
+
+  // Collect row count and column sizes
+  int64_t row_count = 0;
+  for (int rg = 0; rg < metadata.num_row_groups(); ++rg) {
+    auto row_group = metadata.RowGroup(rg);
+    row_count += row_group->num_rows();
+    for (int col = 0; col < row_group->num_columns(); ++col) {
+      auto column_chunk = row_group->ColumnChunk(col);
+      auto field_id_opt = GetFieldId(*parquet_schema.Column(col));
+      if (!field_id_opt.has_value()) {
+        continue;
+      }
+      int32_t field_id = field_id_opt.value();
+
+      ICEBERG_ASSIGN_OR_RAISE(auto field_name, 
schema.FindColumnNameById(field_id));
+      if (!field_name.has_value()) {
+        continue;
+      }
+
+      MetricsMode mode = metrics_config.ColumnMode(field_name.value());
+      if (mode.kind != MetricsMode::Kind::kNone) {
+        metrics.column_sizes[field_id] =
+            metrics.column_sizes.contains(field_id)
+                ? metrics.column_sizes[field_id] + 
column_chunk->total_compressed_size()
+                : column_chunk->total_compressed_size();
+      }
+    }
+  }
+  metrics.row_count = row_count;
+
+  // Collect metrics for all primitive fields
+  CollectMetricsVisitor visitor(parquet_schema, metrics_config, metadata, 
field_metrics,
+                                metrics);
+  ICEBERG_RETURN_UNEXPECTED(visitor.VisitStruct(schema, ""));
+
+  return metrics;
+}
+
+}  // namespace iceberg::parquet
diff --git a/src/iceberg/parquet/parquet_metrics.h 
b/src/iceberg/parquet/parquet_metrics.h
new file mode 100644
index 00000000..eb916241
--- /dev/null
+++ b/src/iceberg/parquet/parquet_metrics.h
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/parquet/parquet_metrics.h
+/// \brief Utilities for extracting metrics from Parquet files.
+
+#include <unordered_map>
+
+#include <parquet/metadata.h>
+
+#include "iceberg/iceberg_bundle_export.h"
+#include "iceberg/metrics.h"
+#include "iceberg/metrics_config.h"
+#include "iceberg/result.h"
+#include "iceberg/schema.h"
+
+namespace iceberg::parquet {
+
+/// \brief Utility class for computing metrics from Parquet files.
+class ICEBERG_BUNDLE_EXPORT ParquetMetrics {
+ public:
+  ParquetMetrics() = delete;
+
+  /// \brief Compute file-level metrics from Parquet file metadata.
+  ///
+  /// This function extracts metrics including row count, column sizes, value 
counts,
+  /// null value counts, and lower/upper bounds from Parquet file metadata.
+  /// NaN value counts are not currently collected from Parquet metadata.
+  /// The metrics are computed according to the provided MetricsConfig, which 
determines
+  /// which columns to collect metrics for and at what granularity (counts 
only, truncated
+  /// bounds, or full bounds).
+  ///
+  /// \param schema The Iceberg schema for the table.
+  /// \param parquet_schema The Parquet schema descriptor.
+  /// \param metrics_config The configuration specifying how to collect 
metrics.
+  /// \param metadata The Parquet file metadata containing row group 
statistics.
+  /// \param field_metrics Optional per-field metrics computed during write.
+  ///        If provided, these take precedence over footer statistics.
+  /// \return Result containing the computed Metrics or an error.
+  static Result<Metrics> GetMetrics(
+      const Schema& schema, const ::parquet::SchemaDescriptor& parquet_schema,
+      const MetricsConfig& metrics_config, const ::parquet::FileMetaData& 
metadata,
+      const std::unordered_map<int32_t, FieldMetrics>& field_metrics = {});
+};
+
+}  // namespace iceberg::parquet
diff --git a/src/iceberg/parquet/parquet_writer.cc 
b/src/iceberg/parquet/parquet_writer.cc
index c70d3310..e91a2a6c 100644
--- a/src/iceberg/parquet/parquet_writer.cc
+++ b/src/iceberg/parquet/parquet_writer.cc
@@ -33,6 +33,7 @@
 
 #include "iceberg/arrow/arrow_io_internal.h"
 #include "iceberg/arrow/arrow_status_internal.h"
+#include "iceberg/parquet/parquet_metrics.h"
 #include "iceberg/schema_internal.h"
 #include "iceberg/util/macros.h"
 
@@ -86,6 +87,8 @@ Result<std::optional<int32_t>> ParseCodecLevel(const 
WriterProperties& propertie
 class ParquetWriter::Impl {
  public:
   Status Open(const WriterOptions& options) {
+    schema_ = options.schema;
+
     ICEBERG_ASSIGN_OR_RAISE(auto compression, 
ParseCompression(options.properties));
     ICEBERG_ASSIGN_OR_RAISE(auto compression_level, 
ParseCodecLevel(options.properties));
 
@@ -98,15 +101,14 @@ class ParquetWriter::Impl {
     auto arrow_writer_properties = 
::parquet::default_arrow_writer_properties();
 
     ArrowSchema c_schema;
-    ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*options.schema, &c_schema));
+    ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*schema_, &c_schema));
     ICEBERG_ARROW_ASSIGN_OR_RETURN(arrow_schema_, 
::arrow::ImportSchema(&c_schema));
 
-    std::shared_ptr<::parquet::SchemaDescriptor> schema_descriptor;
     ICEBERG_ARROW_RETURN_NOT_OK(
         ::parquet::arrow::ToParquetSchema(arrow_schema_.get(), 
*writer_properties,
-                                          *arrow_writer_properties, 
&schema_descriptor));
+                                          *arrow_writer_properties, 
&parquet_schema_));
     auto schema_node = std::static_pointer_cast<::parquet::schema::GroupNode>(
-        schema_descriptor->schema_root());
+        parquet_schema_->schema_root());
 
     ICEBERG_RETURN_UNEXPECTED(CheckCompressionAvailable(
         options.properties.Get(WriterProperties::kParquetCompression), 
compression));
@@ -119,6 +121,8 @@ class ParquetWriter::Impl {
         ::parquet::arrow::FileWriter::Make(pool_, std::move(file_writer), 
arrow_schema_,
                                            std::move(arrow_writer_properties), 
&writer_));
 
+    metrics_config_ = options.metrics_config;
+
     return {};
   }
 
@@ -143,6 +147,7 @@ class ParquetWriter::Impl {
     for (int i = 0; i < metadata->num_row_groups(); ++i) {
       split_offsets_.push_back(metadata->RowGroup(i)->file_offset());
     }
+    metadata_ = writer_->metadata();
     writer_.reset();
 
     ICEBERG_ARROW_ASSIGN_OR_RETURN(total_bytes_, output_stream_->Tell());
@@ -165,15 +170,35 @@ class ParquetWriter::Impl {
 
   std::vector<int64_t> split_offsets() const { return split_offsets_; }
 
+  Result<Metrics> metrics() {
+    if (writer_) {
+      return Invalid("Cannot return metrics for unclosed writer");
+    }
+    if (!metadata_) {
+      return Metrics();
+    }
+    // TODO(WZhuo): collect write-side FieldMetrics to support NaN value 
counts.
+    return ParquetMetrics::GetMetrics(*schema_, *parquet_schema_, 
*metrics_config_,
+                                      *metadata_, {});
+  }
+
  private:
   // TODO(gangwu): make memory pool configurable
   ::arrow::MemoryPool* pool_ = ::arrow::default_memory_pool();
+  // Schema to write from the Iceberg table.
+  std::shared_ptr<Schema> schema_;
   // Schema to write from the Parquet file.
   std::shared_ptr<::arrow::Schema> arrow_schema_;
+  // Parquet schema descriptor generated from the Arrow schema.
+  std::shared_ptr<::parquet::SchemaDescriptor> parquet_schema_;
+  // Metrics config for collecting metrics during write.
+  std::shared_ptr<MetricsConfig> metrics_config_;
   // The output stream to write Parquet file.
   std::shared_ptr<::arrow::io::OutputStream> output_stream_;
   // Parquet file writer to write ArrowArray.
   std::unique_ptr<::parquet::arrow::FileWriter> writer_;
+  // Store the metadata if writer has been closed.
+  std::shared_ptr<::parquet::FileMetaData> metadata_;
   // Total length of the written Parquet file.
   int64_t total_bytes_{0};
   // Row group start offsets in the Parquet file.
@@ -191,12 +216,7 @@ Status ParquetWriter::Write(ArrowArray* array) { return 
impl_->Write(array); }
 
 Status ParquetWriter::Close() { return impl_->Close(); }
 
-Result<Metrics> ParquetWriter::metrics() {
-  if (!impl_->Closed()) {
-    return Invalid("ParquetWriter is not closed");
-  }
-  return {};
-}
+Result<Metrics> ParquetWriter::metrics() { return impl_->metrics(); }
 
 Result<int64_t> ParquetWriter::length() { return impl_->length(); }
 
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index 7b546267..b415154d 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -192,7 +192,9 @@ if(ICEBERG_BUILD_BUNDLE)
   add_iceberg_test(parquet_test
                    USE_BUNDLE
                    SOURCES
+                   metrics_test_base.cc
                    parquet_data_test.cc
+                   parquet_metrics_test.cc
                    parquet_schema_test.cc
                    parquet_test.cc)
 
diff --git a/src/iceberg/test/metrics_test_base.cc 
b/src/iceberg/test/metrics_test_base.cc
new file mode 100644
index 00000000..cc5f7cd6
--- /dev/null
+++ b/src/iceberg/test/metrics_test_base.cc
@@ -0,0 +1,1020 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/test/metrics_test_base.h"
+
+#include <arrow/builder.h>
+#include <arrow/c/bridge.h>
+#include <arrow/json/from_string.h>
+
+#include "iceberg/arrow/arrow_io_internal.h"
+#include "iceberg/arrow/arrow_status_internal.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/type.h"
+#include "iceberg/util/decimal.h"
+
+namespace iceberg::test {
+
+void MetricsTestBase::SetUp() {
+  file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO();
+  temp_dir_ = "metrics_test";
+}
+
+void MetricsTestBase::AssertCounts(int field_id,
+                                   std::optional<int64_t> expected_value_count,
+                                   std::optional<int64_t> expected_null_count,
+                                   const Metrics& metrics) {
+  if (expected_value_count.has_value()) {
+    ASSERT_TRUE(metrics.value_counts.contains(field_id))
+        << "Field " << field_id << " should have value count";
+    EXPECT_EQ(metrics.value_counts.at(field_id), expected_value_count.value())
+        << "Field " << field_id << " value count mismatch";
+  } else {
+    EXPECT_FALSE(metrics.value_counts.contains(field_id))
+        << "Field " << field_id << " should not have value count";
+  }
+
+  if (expected_null_count.has_value()) {
+    ASSERT_TRUE(metrics.null_value_counts.contains(field_id))
+        << "Field " << field_id << " should have null count";
+    EXPECT_EQ(metrics.null_value_counts.at(field_id), 
expected_null_count.value())
+        << "Field " << field_id << " null count mismatch";
+  } else {
+    EXPECT_FALSE(metrics.null_value_counts.contains(field_id))
+        << "Field " << field_id << " should not have null count";
+  }
+}
+
+void MetricsTestBase::AssertCounts(int field_id,
+                                   std::optional<int64_t> expected_value_count,
+                                   std::optional<int64_t> expected_null_count,
+                                   std::optional<int64_t> expected_nan_count,
+                                   const Metrics& metrics) {
+  AssertCounts(field_id, expected_value_count, expected_null_count, metrics);
+
+  if (expected_nan_count.has_value()) {
+    ASSERT_TRUE(metrics.nan_value_counts.contains(field_id))
+        << "Field " << field_id << " should have NaN count";
+    EXPECT_EQ(metrics.nan_value_counts.at(field_id), 
expected_nan_count.value())
+        << "Field " << field_id << " NaN count mismatch";
+  } else {
+    EXPECT_FALSE(metrics.nan_value_counts.contains(field_id))
+        << "Field " << field_id << " should not have NaN count";
+  }
+}
+
+template <typename T>
+void MetricsTestBase::AssertBounds(int field_id, 
std::shared_ptr<PrimitiveType> type,
+                                   std::optional<T> expected_lower,
+                                   std::optional<T> expected_upper,
+                                   const Metrics& metrics) {
+  if (expected_lower.has_value()) {
+    ASSERT_TRUE(metrics.lower_bounds.contains(field_id))
+        << "Field " << field_id << " should have lower bound";
+    const auto& literal = metrics.lower_bounds.at(field_id);
+    ASSERT_FALSE(literal.IsNull())
+        << "Field " << field_id << " lower bound literal should not be null";
+    EXPECT_EQ(std::get<T>(literal.value()), expected_lower.value())
+        << "Field " << field_id << " lower bound mismatch";
+  } else {
+    EXPECT_FALSE(metrics.lower_bounds.contains(field_id));
+  }
+
+  if (expected_upper.has_value()) {
+    ASSERT_TRUE(metrics.upper_bounds.contains(field_id))
+        << "Field " << field_id << " should have upper bound";
+    const auto& literal = metrics.upper_bounds.at(field_id);
+    ASSERT_FALSE(literal.IsNull())
+        << "Field " << field_id << " upper bound literal should not be null";
+    EXPECT_EQ(std::get<T>(literal.value()), expected_upper.value())
+        << "Field " << field_id << " upper bound mismatch";
+  } else {
+    EXPECT_FALSE(metrics.upper_bounds.contains(field_id));
+  }
+}
+
+// Explicit template instantiations for common types
+template void MetricsTestBase::AssertBounds<bool>(int, 
std::shared_ptr<PrimitiveType>,
+                                                  std::optional<bool>,
+                                                  std::optional<bool>, const 
Metrics&);
+template void MetricsTestBase::AssertBounds<int32_t>(int, 
std::shared_ptr<PrimitiveType>,
+                                                     std::optional<int32_t>,
+                                                     std::optional<int32_t>,
+                                                     const Metrics&);
+template void MetricsTestBase::AssertBounds<int64_t>(int, 
std::shared_ptr<PrimitiveType>,
+                                                     std::optional<int64_t>,
+                                                     std::optional<int64_t>,
+                                                     const Metrics&);
+template void MetricsTestBase::AssertBounds<float>(int, 
std::shared_ptr<PrimitiveType>,
+                                                   std::optional<float>,
+                                                   std::optional<float>, const 
Metrics&);
+template void MetricsTestBase::AssertBounds<double>(int, 
std::shared_ptr<PrimitiveType>,
+                                                    std::optional<double>,
+                                                    std::optional<double>,
+                                                    const Metrics&);
+template void MetricsTestBase::AssertBounds<std::string>(int,
+                                                         
std::shared_ptr<PrimitiveType>,
+                                                         
std::optional<std::string>,
+                                                         
std::optional<std::string>,
+                                                         const Metrics&);
+template void MetricsTestBase::AssertBounds<std::vector<uint8_t>>(
+    int, std::shared_ptr<PrimitiveType>, std::optional<std::vector<uint8_t>>,
+    std::optional<std::vector<uint8_t>>, const Metrics&);
+
+template void MetricsTestBase::AssertBounds<Decimal>(int, 
std::shared_ptr<PrimitiveType>,
+                                                     std::optional<Decimal>,
+                                                     std::optional<Decimal>,
+                                                     const Metrics&);
+
+std::shared_ptr<::arrow::Array> MetricsTestBase::CreateRecordArrays(
+    const std::shared_ptr<::arrow::Schema>& arrow_schema, const std::string& 
json_data) {
+  auto struct_type = ::arrow::struct_(arrow_schema->fields());
+  return ::arrow::json::ArrayFromJSONString(struct_type, 
json_data).ValueOrDie();
+}
+
+std::shared_ptr<Schema> MetricsTestBase::SimpleSchema() {
+  return std::make_shared<Schema>(std::vector<SchemaField>{
+      SchemaField::MakeOptional(1, "booleanCol", boolean()),
+      SchemaField::MakeRequired(2, "intCol", int32()),
+      SchemaField::MakeOptional(3, "longCol", int64()),
+      SchemaField::MakeRequired(4, "floatCol", float32()),
+      SchemaField::MakeOptional(5, "doubleCol", float64()),
+      SchemaField::MakeOptional(6, "decimalCol", decimal(10, 2)),
+      SchemaField::MakeRequired(7, "stringCol", string()),
+      SchemaField::MakeOptional(8, "dateCol", date()),
+      SchemaField::MakeRequired(9, "timeCol", time()),
+      SchemaField::MakeRequired(10, "timestampColAboveEpoch", timestamp()),
+      SchemaField::MakeRequired(11, "fixedCol", fixed(4)),
+      SchemaField::MakeRequired(12, "binaryCol", binary()),
+      SchemaField::MakeRequired(13, "timestampColBelowEpoch", timestamp()),
+  });
+}
+
+std::shared_ptr<Schema> MetricsTestBase::NestedSchema() {
+  auto leaf_struct = struct_({
+      SchemaField::MakeOptional(5, "leafLongCol", int64()),
+      SchemaField::MakeOptional(6, "leafBinaryCol", binary()),
+  });
+
+  auto nested_struct = struct_({
+      SchemaField::MakeRequired(3, "longCol", int64()),
+      SchemaField::MakeRequired(4, "leafStructCol", leaf_struct),
+      SchemaField::MakeRequired(7, "doubleCol", float64()),
+  });
+
+  return std::make_shared<Schema>(std::vector<SchemaField>{
+      SchemaField::MakeRequired(1, "intCol", int32()),
+      SchemaField::MakeRequired(2, "nestedStructCol", nested_struct),
+  });
+}
+
+std::shared_ptr<Schema> MetricsTestBase::FloatDoubleSchema() {
+  return std::make_shared<Schema>(std::vector<SchemaField>{
+      SchemaField::MakeOptional(1, "floatCol", float32()),
+      SchemaField::MakeOptional(2, "doubleCol", float64()),
+  });
+}
+
+Result<std::shared_ptr<::arrow::Schema>> ToArrowSchema(std::shared_ptr<Schema> 
schema) {
+  ArrowSchema c_schema;
+  ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*schema, &c_schema));
+  std::shared_ptr<::arrow::Schema> arrow_schema;
+  ICEBERG_ARROW_ASSIGN_OR_RETURN(arrow_schema, 
::arrow::ImportSchema(&c_schema));
+  return arrow_schema;
+}
+
+void MetricsTestBase::MetricsForRepeatedValues() {
+  auto schema = SimpleSchema();
+  ICEBERG_UNWRAP_OR_FAIL(auto arrow_schema, ToArrowSchema(schema));
+  ICEBERG_UNWRAP_OR_FAIL(auto records, BuildSimpleRecords(arrow_schema, 2));
+  ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, records));
+
+  ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
+  EXPECT_EQ(*metrics.row_count, 2);
+
+  AssertCounts(1, 2, 0, metrics);
+  AssertCounts(2, 2, 0, metrics);
+  AssertCounts(3, 2, 1, metrics);
+  // TODO(WZhuo) Assert NaN metrics
+  AssertCounts(4, 2, 0, metrics);  // floatCol has 2 NaN values
+  AssertCounts(5, 2, 0, metrics);
+  AssertCounts(6, 2, 1, metrics);
+  AssertCounts(7, 2, 0, metrics);
+  AssertCounts(8, 2, 0, metrics);
+  AssertCounts(9, 2, 0, metrics);
+  AssertCounts(10, 2, 0, metrics);
+  AssertCounts(11, 2, 0, metrics);
+  AssertCounts(12, 2, 0, metrics);
+  AssertCounts(13, 2, 0, metrics);
+}
+
+void MetricsTestBase::MetricsForTopLevelFields() {
+  auto schema = SimpleSchema();
+  ICEBERG_UNWRAP_OR_FAIL(auto arrow_schema, ToArrowSchema(schema));
+
+  auto records = CreateRecordArrays(arrow_schema, R"([
+    {"booleanCol": true, "intCol": 3, "longCol": 5, "floatCol": 2.0, 
"doubleCol": 2.0,
+     "decimalCol": "3.50", "stringCol": "AAA", "dateCol": 1500, "timeCol": 
2000,
+     "timestampColAboveEpoch": 0, "fixedCol": "abcd", "binaryCol": "S", 
"timestampColBelowEpoch": -1900300},
+    {"booleanCol": false, "intCol": -2147483648, "longCol": null, "floatCol": 
1.0, "doubleCol": null,
+     "decimalCol": null, "stringCol": "ZZZ", "dateCol": null, "timeCol": 3000,
+     "timestampColAboveEpoch": 900, "fixedCol": "abcd", "binaryCol": "W", 
"timestampColBelowEpoch": -7000}
+   ])");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, records));
+
+  ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
+  EXPECT_EQ(*metrics.row_count, 2);
+
+  AssertCounts(1, 2, 0, metrics);
+  AssertBounds<bool>(1, boolean(), false, true, metrics);
+  AssertCounts(2, 2, 0, metrics);
+  AssertBounds<int32_t>(2, int32(), std::numeric_limits<int32_t>::min(), 3, 
metrics);
+  AssertCounts(3, 2, 1, metrics);
+  AssertBounds<int64_t>(3, int64(), 5, 5, metrics);
+  AssertCounts(4, 2, 0, metrics);
+  AssertBounds<float>(4, float32(), 1.0F, 2.0F, metrics);
+  AssertCounts(5, 2, 1, metrics);
+  AssertBounds<double>(5, float64(), 2.0, 2.0, metrics);
+  AssertCounts(6, 2L, 1L, metrics);
+  AssertBounds<Decimal>(6, std::make_shared<DecimalType>(10, 2), Decimal(350),
+                        Decimal(350), metrics);
+  AssertCounts(7, 2, 0, metrics);
+  AssertBounds<std::string>(7, string(), std::string("AAA"), 
std::string("ZZZ"), metrics);
+
+  AssertCounts(8, 2, 1, metrics);
+  AssertBounds<int32_t>(8, date(), 1500, 1500, metrics);
+
+  AssertCounts(9, 2, 0, metrics);
+  AssertBounds<int64_t>(9, time(), 2000, 3000, metrics);
+
+  AssertCounts(10, 2, 0, metrics);
+  AssertBounds<int64_t>(10, timestamp(), 0, 900, metrics);
+
+  AssertCounts(11, 2, 0, metrics);
+  std::vector<uint8_t> fixed_val = {'a', 'b', 'c', 'd'};
+  AssertBounds<std::vector<uint8_t>>(11, fixed(4), fixed_val, fixed_val, 
metrics);
+
+  AssertCounts(12, 2, 0, metrics);
+  AssertBounds<std::vector<uint8_t>>(12, binary(), std::vector<uint8_t>{'S'},
+                                     std::vector<uint8_t>{'W'}, metrics);
+
+  AssertCounts(13, 2, 0, metrics);
+  AssertBounds<int64_t>(13, timestamp(), -1900300, -7000, metrics);
+}
+
+void MetricsTestBase::MetricsForDecimals() {
+  auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
+      SchemaField::MakeRequired(1, "decimalAsInt32", decimal(4, 2)),
+      SchemaField::MakeRequired(2, "decimalAsInt64", decimal(14, 2)),
+      SchemaField::MakeRequired(3, "decimalAsFixed", decimal(22, 2)),
+  });
+
+  auto arrow_schema = ::arrow::schema({
+      ::arrow::field("decimalAsInt32", ::arrow::decimal128(4, 2), false),
+      ::arrow::field("decimalAsInt64", ::arrow::decimal128(14, 2), false),
+      ::arrow::field("decimalAsFixed", ::arrow::decimal128(22, 2), false),
+  });
+
+  // Create decimal values
+  ::arrow::Decimal128Builder builder1(::arrow::decimal128(4, 2));
+  ::arrow::Decimal128Builder builder2(::arrow::decimal128(14, 2));
+  ::arrow::Decimal128Builder builder3(::arrow::decimal128(22, 2));
+
+  // 2.55, 4.75, 5.80
+  ASSERT_TRUE(builder1.Append(::arrow::Decimal128("255")).ok());  // 2.55 with 
scale 2
+  ASSERT_TRUE(builder2.Append(::arrow::Decimal128("475")).ok());  // 4.75 with 
scale 2
+  ASSERT_TRUE(builder3.Append(::arrow::Decimal128("580")).ok());  // 5.80 with 
scale 2
+
+  auto array1 = builder1.Finish().ValueOrDie();
+  auto array2 = builder2.Finish().ValueOrDie();
+  auto array3 = builder3.Finish().ValueOrDie();
+
+  std::vector<std::shared_ptr<::arrow::Array>> field_arrays = {array1, array2, 
array3};
+  auto records =
+      ::arrow::StructArray::Make(field_arrays, 
arrow_schema->fields()).ValueOrDie();
+
+  ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, records));
+
+  ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
+  EXPECT_EQ(*metrics.row_count, 1);
+
+  AssertCounts(1, 1, 0, metrics);
+  // For decimals, bounds exist but we just check they're present
+  EXPECT_TRUE(metrics.lower_bounds.contains(1));
+  EXPECT_TRUE(metrics.upper_bounds.contains(1));
+
+  AssertCounts(2, 1, 0, metrics);
+  EXPECT_TRUE(metrics.lower_bounds.contains(2));
+  EXPECT_TRUE(metrics.upper_bounds.contains(2));
+
+  AssertCounts(3, 1, 0, metrics);
+  EXPECT_TRUE(metrics.lower_bounds.contains(3));
+  EXPECT_TRUE(metrics.upper_bounds.contains(3));
+}
+
+void MetricsTestBase::MetricsForNestedStructFields() {
+  auto schema = NestedSchema();
+
+  ICEBERG_UNWRAP_OR_FAIL(auto records, BuildNestedRecords());
+  ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, records));
+
+  ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
+  EXPECT_EQ(*metrics.row_count, 1);
+
+  AssertCounts(1, 1, 0, metrics);
+  AssertBounds<int32_t>(1, int32(), std::numeric_limits<int32_t>::min(),
+                        std::numeric_limits<int32_t>::min(), metrics);
+
+  AssertCounts(3, 1, 0, metrics);
+  AssertBounds<int64_t>(3, int64(), 100, 100, metrics);
+
+  AssertCounts(5, 1, 0, metrics);
+  AssertBounds<int64_t>(5, int64(), 20, 20, metrics);
+
+  AssertCounts(6, 1L, 0L, metrics);
+  AssertBounds<std::vector<uint8_t>>(6, binary(), std::vector<uint8_t>{'A'},
+                                     std::vector<uint8_t>{'A'}, metrics);
+
+  // TODO(WZhuo) Assert NaN metrics
+  AssertCounts(7, 1L, 0L, metrics);
+  AssertBounds<double>(7, float64(), std::nullopt, std::nullopt, metrics);
+}
+
+void MetricsTestBase::MetricsModeForNestedStructFields() {
+  auto schema = NestedSchema();
+
+  // Create MetricsConfig with custom column modes
+  // Default mode is None, but nestedStructCol.longCol should be Full
+  std::unordered_map<std::string, std::string> properties = {
+      {"write.metadata.metrics.default", "none"},
+      {"write.metadata.metrics.column.nestedStructCol.longCol", "full"}};
+
+  ICEBERG_UNWRAP_OR_FAIL(auto config, MetricsConfig::Make(properties));
+  ICEBERG_UNWRAP_OR_FAIL(auto records, BuildNestedRecords());
+  ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, config, records));
+
+  ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
+  EXPECT_EQ(*metrics.row_count, 1);
+
+  // Only field 3 (nestedStructCol.longCol) should have bounds
+  EXPECT_EQ(metrics.lower_bounds.size(), 1);
+  EXPECT_EQ(metrics.upper_bounds.size(), 1);
+  AssertBounds<int64_t>(3, int64(), 100, 100, metrics);
+}
+
+void MetricsTestBase::MetricsForListAndMapElements() {
+  // Create struct type for map values
+  auto leaf_struct = struct_({
+      SchemaField::MakeRequired(1, "leafIntCol", int32()),
+      SchemaField::MakeOptional(2, "leafStringCol", string()),
+  });
+
+  // Create list and map types using constructors directly
+  auto list_type = list(SchemaField::MakeRequired(4, "element", int32()));
+  auto map_type = map(SchemaField::MakeRequired(6, "key", string()),
+                      SchemaField::MakeRequired(7, "value", leaf_struct));
+
+  auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
+      SchemaField::MakeOptional(3, "intListCol", list_type),
+      SchemaField::MakeOptional(5, "mapCol", map_type),
+  });
+
+  // Create Arrow schema
+  auto arrow_leaf_struct = ::arrow::struct_({
+      ::arrow::field("leafIntCol", ::arrow::int32(), false),
+      ::arrow::field("leafStringCol", ::arrow::utf8(), true),
+  });
+
+  auto arrow_schema = ::arrow::schema({
+      ::arrow::field("intListCol",
+                     ::arrow::list(::arrow::field("element", ::arrow::int32(), 
false)),
+                     true),
+      ::arrow::field("mapCol", ::arrow::map(::arrow::utf8(), 
arrow_leaf_struct), true),
+  });
+
+  // Create list: [10, 11, 12]
+  ::arrow::Int32Builder int_builder;
+  ASSERT_TRUE(int_builder.Append(10).ok());
+  ASSERT_TRUE(int_builder.Append(11).ok());
+  ASSERT_TRUE(int_builder.Append(12).ok());
+  auto int_array = int_builder.Finish().ValueOrDie();
+
+  ::arrow::ListBuilder list_builder(::arrow::default_memory_pool(),
+                                    std::make_shared<::arrow::Int32Builder>());
+  ASSERT_TRUE(list_builder.Append().ok());
+  auto list_value_builder =
+      static_cast<::arrow::Int32Builder*>(list_builder.value_builder());
+  ASSERT_TRUE(list_value_builder->Append(10).ok());
+  ASSERT_TRUE(list_value_builder->Append(11).ok());
+  ASSERT_TRUE(list_value_builder->Append(12).ok());
+  auto list_array = list_builder.Finish().ValueOrDie();
+
+  // Create map: {"4" -> {leafIntCol: 1, leafStringCol: "BBB"}}
+  // MapArray needs offsets, keys, and items (struct values)
+  ::arrow::Int32Builder offset_builder;
+  ASSERT_TRUE(offset_builder.Append(0).ok());  // Start offset
+  ASSERT_TRUE(offset_builder.Append(1).ok());  // End offset (1 entry)
+  auto offsets = offset_builder.Finish().ValueOrDie();
+
+  ::arrow::StringBuilder key_builder;
+  ASSERT_TRUE(key_builder.Append("4").ok());
+  auto keys = key_builder.Finish().ValueOrDie();
+
+  ::arrow::Int32Builder struct_int_builder;
+  ::arrow::StringBuilder struct_str_builder;
+  ASSERT_TRUE(struct_int_builder.Append(1).ok());
+  ASSERT_TRUE(struct_str_builder.Append("BBB").ok());
+  auto struct_int_array = struct_int_builder.Finish().ValueOrDie();
+  auto struct_str_array = struct_str_builder.Finish().ValueOrDie();
+  auto items =
+      ::arrow::StructArray::Make({struct_int_array, struct_str_array},
+                                 {::arrow::field("leafIntCol", 
::arrow::int32(), false),
+                                  ::arrow::field("leafStringCol", 
::arrow::utf8(), true)})
+          .ValueOrDie();
+
+  auto map_array = ::arrow::MapArray::FromArrays(offsets, keys, 
items).ValueOrDie();
+
+  std::vector<std::shared_ptr<::arrow::Array>> field_arrays = {list_array, 
map_array};
+  auto records =
+      ::arrow::StructArray::Make(field_arrays, 
arrow_schema->fields()).ValueOrDie();
+
+  ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, records));
+
+  ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
+  EXPECT_EQ(*metrics.row_count, 1);
+
+  // For list and map elements, metrics should not be collected
+  // Field IDs: 1 (leafIntCol), 2 (leafStringCol), 4 (list element), 6 (map 
key), 7 (map
+  // value)
+  AssertCounts(1, std::nullopt, std::nullopt, metrics);
+  AssertCounts(2, std::nullopt, std::nullopt, metrics);
+  AssertCounts(4, std::nullopt, std::nullopt, metrics);
+  AssertCounts(6, std::nullopt, std::nullopt, metrics);
+
+  AssertBounds<int32_t>(1, int32(), std::nullopt, std::nullopt, metrics);
+  AssertBounds<std::string>(2, string(), std::nullopt, std::nullopt, metrics);
+  AssertBounds<int32_t>(4, int32(), std::nullopt, std::nullopt, metrics);
+  AssertBounds<std::string>(6, string(), std::nullopt, std::nullopt, metrics);
+  ASSERT_FALSE(metrics.lower_bounds.contains(7));
+  ASSERT_FALSE(metrics.upper_bounds.contains(7));
+}
+
+void MetricsTestBase::MetricsForNullColumns() {
+  auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
+      SchemaField::MakeOptional(1, "intCol", int32()),
+  });
+
+  auto arrow_schema = ::arrow::schema({
+      ::arrow::field("intCol", ::arrow::int32(), true),
+  });
+
+  auto records = CreateRecordArrays(arrow_schema, R"([
+    {"intCol": null},
+    {"intCol": null}
+  ])");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, records));
+
+  ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
+  EXPECT_EQ(*metrics.row_count, 2);
+  AssertCounts(1, 2, 2, metrics);
+  AssertBounds<int32_t>(1, int32(), std::nullopt, std::nullopt, metrics);
+}
+
+void MetricsTestBase::MetricsForNaNColumns() {
+  auto schema = FloatDoubleSchema();
+
+  auto arrow_schema = ::arrow::schema({
+      ::arrow::field("floatCol", ::arrow::float32(), true),
+      ::arrow::field("doubleCol", ::arrow::float64(), true),
+  });
+
+  ::arrow::FloatBuilder float_builder;
+  ::arrow::DoubleBuilder double_builder;
+
+  
ASSERT_TRUE(float_builder.Append(std::numeric_limits<float>::quiet_NaN()).ok());
+  
ASSERT_TRUE(double_builder.Append(std::numeric_limits<double>::quiet_NaN()).ok());
+  
ASSERT_TRUE(float_builder.Append(std::numeric_limits<float>::quiet_NaN()).ok());
+  
ASSERT_TRUE(double_builder.Append(std::numeric_limits<double>::quiet_NaN()).ok());
+
+  auto float_array = float_builder.Finish().ValueOrDie();
+  auto double_array = double_builder.Finish().ValueOrDie();
+
+  std::vector<std::shared_ptr<::arrow::Array>> field_arrays = {float_array, 
double_array};
+  auto records =
+      ::arrow::StructArray::Make(field_arrays, 
arrow_schema->fields()).ValueOrDie();
+
+  ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, records));
+
+  ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
+  EXPECT_EQ(*metrics.row_count, 2);
+  // TODO(WZhuo) Assert NaN metrics
+  AssertCounts(1, 2, 0, metrics);
+  AssertCounts(2, 2, 0, metrics);
+
+  // When all values are NaN, bounds should not be set
+  AssertBounds<float>(1, float32(), std::nullopt, std::nullopt, metrics);
+  AssertBounds<double>(2, float64(), std::nullopt, std::nullopt, metrics);
+}
+
+void MetricsTestBase::ColumnBoundsWithNaNValueAtFront() {
+  auto schema = FloatDoubleSchema();
+
+  auto arrow_schema = ::arrow::schema({
+      ::arrow::field("floatCol", ::arrow::float32(), true),
+      ::arrow::field("doubleCol", ::arrow::float64(), true),
+  });
+
+  ::arrow::FloatBuilder float_builder;
+  ::arrow::DoubleBuilder double_builder;
+
+  // NaN, 1.2, 5.6
+  
ASSERT_TRUE(float_builder.Append(std::numeric_limits<float>::quiet_NaN()).ok());
+  
ASSERT_TRUE(double_builder.Append(std::numeric_limits<double>::quiet_NaN()).ok());
+  ASSERT_TRUE(float_builder.Append(1.2F).ok());
+  ASSERT_TRUE(double_builder.Append(3.4).ok());
+  ASSERT_TRUE(float_builder.Append(5.6F).ok());
+  ASSERT_TRUE(double_builder.Append(7.8).ok());
+
+  auto float_array = float_builder.Finish().ValueOrDie();
+  auto double_array = double_builder.Finish().ValueOrDie();
+
+  std::vector<std::shared_ptr<::arrow::Array>> field_arrays = {float_array, 
double_array};
+  auto records =
+      ::arrow::StructArray::Make(field_arrays, 
arrow_schema->fields()).ValueOrDie();
+
+  ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, records));
+
+  ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
+  EXPECT_EQ(*metrics.row_count, 3);
+  // TODO(WZhuo) Assert NaN metrics
+  AssertCounts(1, 3, 0, metrics);
+  AssertCounts(2, 3, 0, metrics);
+
+  // Bounds should be computed from non-NaN values
+  if (metrics.lower_bounds.contains(1)) {
+    AssertBounds<float>(1, float32(), 1.2F, 5.6F, metrics);
+    AssertBounds<double>(2, float64(), 3.4, 7.8, metrics);
+  }
+}
+
+void MetricsTestBase::ColumnBoundsWithNaNValueInMiddle() {
+  auto schema = FloatDoubleSchema();
+
+  auto arrow_schema = ::arrow::schema({
+      ::arrow::field("floatCol", ::arrow::float32(), true),
+      ::arrow::field("doubleCol", ::arrow::float64(), true),
+  });
+
+  ::arrow::FloatBuilder float_builder;
+  ::arrow::DoubleBuilder double_builder;
+
+  // 1.2, NaN, 5.6
+  ASSERT_TRUE(float_builder.Append(1.2F).ok());
+  ASSERT_TRUE(double_builder.Append(3.4).ok());
+  
ASSERT_TRUE(float_builder.Append(std::numeric_limits<float>::quiet_NaN()).ok());
+  
ASSERT_TRUE(double_builder.Append(std::numeric_limits<double>::quiet_NaN()).ok());
+  ASSERT_TRUE(float_builder.Append(5.6F).ok());
+  ASSERT_TRUE(double_builder.Append(7.8).ok());
+
+  auto float_array = float_builder.Finish().ValueOrDie();
+  auto double_array = double_builder.Finish().ValueOrDie();
+
+  std::vector<std::shared_ptr<::arrow::Array>> field_arrays = {float_array, 
double_array};
+  auto records =
+      ::arrow::StructArray::Make(field_arrays, 
arrow_schema->fields()).ValueOrDie();
+
+  ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, records));
+
+  ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
+  EXPECT_EQ(*metrics.row_count, 3);
+  AssertCounts(1, 3, 0, metrics);
+  AssertCounts(2, 3, 0, metrics);
+
+  if (metrics.lower_bounds.contains(1)) {
+    AssertBounds<float>(1, float32(), 1.2F, 5.6F, metrics);
+    AssertBounds<double>(2, float64(), 3.4, 7.8, metrics);
+  }
+}
+
+void MetricsTestBase::ColumnBoundsWithNaNValueAtEnd() {
+  auto schema = FloatDoubleSchema();
+
+  auto arrow_schema = ::arrow::schema({
+      ::arrow::field("floatCol", ::arrow::float32(), true),
+      ::arrow::field("doubleCol", ::arrow::float64(), true),
+  });
+
+  ::arrow::FloatBuilder float_builder;
+  ::arrow::DoubleBuilder double_builder;
+
+  // 1.2, 5.6, NaN
+  ASSERT_TRUE(float_builder.Append(1.2F).ok());
+  ASSERT_TRUE(double_builder.Append(3.4).ok());
+  ASSERT_TRUE(float_builder.Append(5.6F).ok());
+  ASSERT_TRUE(double_builder.Append(7.8).ok());
+  
ASSERT_TRUE(float_builder.Append(std::numeric_limits<float>::quiet_NaN()).ok());
+  
ASSERT_TRUE(double_builder.Append(std::numeric_limits<double>::quiet_NaN()).ok());
+
+  auto float_array = float_builder.Finish().ValueOrDie();
+  auto double_array = double_builder.Finish().ValueOrDie();
+
+  std::vector<std::shared_ptr<::arrow::Array>> field_arrays = {float_array, 
double_array};
+  auto records =
+      ::arrow::StructArray::Make(field_arrays, 
arrow_schema->fields()).ValueOrDie();
+
+  ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, records));
+
+  ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
+  EXPECT_EQ(*metrics.row_count, 3);
+  AssertCounts(1, 3, 0, metrics);
+  AssertCounts(2, 3, 0, metrics);
+
+  if (metrics.lower_bounds.contains(1)) {
+    AssertBounds<float>(1, float32(), 1.2F, 5.6F, metrics);
+    AssertBounds<double>(2, float64(), 3.4, 7.8, metrics);
+  }
+}
+
+void MetricsTestBase::MetricsForTopLevelWithMultipleRowGroup() {
+  auto schema = SimpleSchema();
+  ICEBERG_UNWRAP_OR_FAIL(auto arrow_schema, ToArrowSchema(schema));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto records, BuildSimpleRecords(arrow_schema, 201));
+  ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, records));
+
+  if (SupportsSmallRowGroups()) {
+    ICEBERG_UNWRAP_OR_FAIL(auto split_count, GetSplitCount());
+    EXPECT_EQ(split_count, 3);
+  }
+
+  ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
+  EXPECT_EQ(*metrics.row_count, 201);
+
+  // Verify metrics are collected for top-level fields
+  AssertCounts(1, 201, 0, metrics);
+  AssertBounds<bool>(1, boolean(), false, true, metrics);
+  AssertCounts(2, 201, 0, metrics);
+  AssertBounds<int32_t>(2, int32(), 3, 203, metrics);
+  AssertCounts(3, 201, 1, metrics);
+  AssertBounds<int64_t>(3, int64(), 1, 200, metrics);
+  AssertCounts(4, 201, 0, metrics);
+  AssertBounds<float>(4, float32(), 2.0F, 201.0F, metrics);
+  AssertCounts(5, 201, 0, metrics);
+  AssertBounds<double>(5, float64(), 2.0, 201.0, metrics);
+  AssertCounts(6, 201L, 1L, metrics);
+  AssertBounds<Decimal>(6, std::make_shared<DecimalType>(10, 2), Decimal(101),
+                        Decimal(300), metrics);
+}
+
+void MetricsTestBase::MetricsForNestedStructFieldsWithMultipleRowGroup() {
+  auto schema = NestedSchema();
+  ICEBERG_UNWRAP_OR_FAIL(auto arrow_schema, ToArrowSchema(schema));
+  ICEBERG_UNWRAP_OR_FAIL(auto records, BuildNestedRecords(201));
+  ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, records));
+
+  if (SupportsSmallRowGroups()) {
+    ICEBERG_UNWRAP_OR_FAIL(auto split_count, GetSplitCount());
+    EXPECT_EQ(split_count, 3);
+  }
+  ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
+  EXPECT_EQ(*metrics.row_count, 201);
+
+  // Verify metrics for top-level field
+  AssertCounts(1, 201, 0, metrics);
+  AssertBounds<int32_t>(1, int32(), std::numeric_limits<int32_t>::min(),
+                        std::numeric_limits<int32_t>::min() + 200, metrics);
+
+  // Verify metrics for nested struct fields
+  AssertCounts(3, 201, 0, metrics);
+  AssertBounds<int64_t>(3, int64(), 100, 100 + 200, metrics);
+
+  AssertCounts(5, 201, 0, metrics);
+  AssertBounds<int64_t>(5, int64(), 20, 20 + 200, metrics);
+
+  AssertCounts(6, 201, 0L, metrics);
+  AssertBounds<std::vector<uint8_t>>(6, binary(), std::vector<uint8_t>{'A'},
+                                     std::vector<uint8_t>{'A'}, metrics);
+
+  AssertCounts(7, 201, 0L, metrics);
+  AssertBounds<double>(7, float64(), std::nullopt, std::nullopt, metrics);
+}
+
+void MetricsTestBase::NoneMetricsMode() {
+  auto schema = NestedSchema();
+
+  std::unordered_map<std::string, std::string> properties = {
+      {"write.metadata.metrics.default", "none"}};
+
+  ICEBERG_UNWRAP_OR_FAIL(auto config, MetricsConfig::Make(properties));
+  ICEBERG_UNWRAP_OR_FAIL(auto records, BuildNestedRecords());
+  ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, config, records));
+
+  ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
+  EXPECT_EQ(*metrics.row_count, 1);
+
+  // In None mode, column_sizes should be empty
+  EXPECT_TRUE(metrics.column_sizes.empty());
+
+  // All counts should be null
+  AssertCounts(1, std::nullopt, std::nullopt, metrics);
+  AssertBounds<int32_t>(1, int32(), std::nullopt, std::nullopt, metrics);
+  AssertCounts(3, std::nullopt, std::nullopt, metrics);
+  AssertBounds<int64_t>(3, int64(), std::nullopt, std::nullopt, metrics);
+  AssertCounts(5, std::nullopt, std::nullopt, metrics);
+  AssertBounds<int64_t>(5, int64(), std::nullopt, std::nullopt, metrics);
+  AssertCounts(6, std::nullopt, std::nullopt, metrics);
+  AssertBounds<std::string>(6, binary(), std::nullopt, std::nullopt, metrics);
+  AssertCounts(7, std::nullopt, std::nullopt, metrics);
+  AssertBounds<double>(7, float64(), std::nullopt, std::nullopt, metrics);
+}
+
+void MetricsTestBase::CountsMetricsMode() {
+  auto schema = NestedSchema();
+
+  std::unordered_map<std::string, std::string> properties = {
+      {"write.metadata.metrics.default", "counts"}};
+
+  ICEBERG_UNWRAP_OR_FAIL(auto config, MetricsConfig::Make(properties));
+  ICEBERG_UNWRAP_OR_FAIL(auto records, BuildNestedRecords());
+  ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, config, records));
+
+  ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
+  EXPECT_EQ(*metrics.row_count, 1);
+
+  // In Counts mode, column_sizes should not be empty
+  EXPECT_FALSE(metrics.column_sizes.empty());
+
+  // Counts should be present but bounds should be null
+  AssertCounts(1, 1, 0, metrics);
+  AssertBounds<int32_t>(1, int32(), std::nullopt, std::nullopt, metrics);
+  AssertCounts(3, 1, 0, metrics);
+  AssertBounds<int64_t>(3, int64(), std::nullopt, std::nullopt, metrics);
+  AssertCounts(5, 1, 0, metrics);
+  AssertBounds<int64_t>(5, int64(), std::nullopt, std::nullopt, metrics);
+  AssertCounts(6, 1, 0, metrics);
+  AssertBounds<std::string>(6, binary(), std::nullopt, std::nullopt, metrics);
+  AssertCounts(7, 1, 0, metrics);
+  AssertBounds<double>(7, float64(), std::nullopt, std::nullopt, metrics);
+}
+
+void MetricsTestBase::FullMetricsMode() {
+  auto schema = NestedSchema();
+
+  std::unordered_map<std::string, std::string> properties = {
+      {"write.metadata.metrics.default", "full"}};
+
+  ICEBERG_UNWRAP_OR_FAIL(auto config, MetricsConfig::Make(properties));
+  ICEBERG_UNWRAP_OR_FAIL(auto records, BuildNestedRecords());
+  ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, config, records));
+
+  ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
+  EXPECT_EQ(*metrics.row_count, 1);
+
+  // In Full mode, column_sizes should not be empty
+  EXPECT_FALSE(metrics.column_sizes.empty());
+
+  // Both counts and bounds should be present
+  AssertCounts(1, 1, 0, metrics);
+  AssertBounds<int32_t>(1, int32(), std::numeric_limits<int32_t>::min(),
+                        std::numeric_limits<int32_t>::min(), metrics);
+  AssertCounts(3, 1, 0, metrics);
+  AssertBounds<int64_t>(3, int64(), 100, 100, metrics);
+  AssertCounts(5, 1, 0, metrics);
+  AssertBounds<int64_t>(5, int64(), 20, 20, metrics);
+  AssertCounts(6, 1, 0, metrics);
+  AssertBounds<std::vector<uint8_t>>(6, binary(), std::vector<uint8_t>{'A'},
+                                     std::vector<uint8_t>{'A'}, metrics);
+  AssertCounts(7, 1, 0, metrics);
+  AssertBounds<double>(7, float64(), std::nullopt, std::nullopt, metrics);
+}
+
+void MetricsTestBase::TruncateStringMetricsMode() {
+  auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
+      SchemaField::MakeRequired(1, "str_to_truncate", string()),
+  });
+
+  auto arrow_schema = ::arrow::schema({
+      ::arrow::field("str_to_truncate", ::arrow::utf8(), false),
+  });
+
+  auto records = CreateRecordArrays(arrow_schema, R"([
+    {"str_to_truncate": "Lorem ipsum dolor sit amet"}
+  ])");
+
+  std::unordered_map<std::string, std::string> properties = {
+      {"write.metadata.metrics.default", "truncate(10)"}};
+
+  ICEBERG_UNWRAP_OR_FAIL(auto config, MetricsConfig::Make(properties));
+  ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, config, records));
+
+  ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
+  EXPECT_EQ(*metrics.row_count, 1);
+
+  // Column sizes should not be empty
+  EXPECT_FALSE(metrics.column_sizes.empty());
+
+  AssertCounts(1, 1, 0, metrics);
+
+  // Bounds should be truncated to 10 characters
+  // Lower bound: "Lorem ipsu" (first 10 chars)
+  // Upper bound: "Lorem ipsv" (first 10 chars with last char incremented)
+  std::string expected_lower = "Lorem ipsu";
+  std::string expected_upper = "Lorem ipsv";
+  AssertBounds<std::string>(1, string(), expected_lower, expected_upper, 
metrics);
+}
+
+void MetricsTestBase::TruncateBinaryMetricsMode() {
+  auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
+      SchemaField::MakeRequired(1, "bin_to_truncate", binary()),
+  });
+
+  auto arrow_schema = ::arrow::schema({
+      ::arrow::field("bin_to_truncate", ::arrow::binary(), false),
+  });
+
+  // Create binary data: {0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0x10, 
0xA, 0xB}
+  ::arrow::BinaryBuilder builder;
+  std::vector<uint8_t> data = {0x1, 0x2, 0x3, 0x4,  0x5, 0x6,
+                               0x7, 0x8, 0x9, 0x10, 0xA, 0xB};
+  ASSERT_TRUE(builder.Append(data.data(), data.size()).ok());
+  auto array = builder.Finish().ValueOrDie();
+
+  std::vector<std::shared_ptr<::arrow::Array>> field_arrays = {array};
+  auto records =
+      ::arrow::StructArray::Make(field_arrays, 
arrow_schema->fields()).ValueOrDie();
+
+  std::unordered_map<std::string, std::string> properties = {
+      {"write.metadata.metrics.default", "truncate(5)"}};
+
+  ICEBERG_UNWRAP_OR_FAIL(auto config, MetricsConfig::Make(properties));
+  ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, config, records));
+
+  ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
+  EXPECT_EQ(*metrics.row_count, 1);
+
+  // Column sizes should not be empty
+  EXPECT_FALSE(metrics.column_sizes.empty());
+
+  AssertCounts(1, 1, 0, metrics);
+
+  // Bounds should be truncated to 5 bytes
+  // Lower bound: {0x1, 0x2, 0x3, 0x4, 0x5}
+  // Upper bound: {0x1, 0x2, 0x3, 0x4, 0x6} (last byte incremented)
+  auto expected_lower = std::vector<uint8_t>{0x1, 0x2, 0x3, 0x4, 0x5};
+  auto expected_upper = std::vector<uint8_t>{0x1, 0x2, 0x3, 0x4, 0x6};
+  AssertBounds<std::vector<uint8_t>>(1, binary(), expected_lower, 
expected_upper,
+                                     metrics);
+}
+
+Result<std::shared_ptr<::arrow::Array>> MetricsTestBase::BuildSimpleRecords(
+    std::shared_ptr<::arrow::Schema> arrow_schema, int32_t count) {
+  ::arrow::BooleanBuilder boolean_builder;
+  ::arrow::Int32Builder int_builder;
+  ::arrow::Int64Builder long_builder;
+  ::arrow::FloatBuilder float_builder;
+  ::arrow::DoubleBuilder double_builder;
+  ::arrow::Decimal128Builder decimal_builder(::arrow::decimal128(10, 2));
+  ::arrow::StringBuilder string_builder;
+  ::arrow::Date32Builder date_builder;
+  ::arrow::Time64Builder 
time_builder(::arrow::time64(::arrow::TimeUnit::MICRO),
+                                      ::arrow::default_memory_pool());
+  ::arrow::TimestampBuilder timestamp_above_builder(
+      ::arrow::timestamp(::arrow::TimeUnit::MICRO), 
::arrow::default_memory_pool());
+  ::arrow::FixedSizeBinaryBuilder fixed_builder(::arrow::fixed_size_binary(4));
+  ::arrow::BinaryBuilder binary_builder;
+  ::arrow::TimestampBuilder timestamp_below_builder(
+      ::arrow::timestamp(::arrow::TimeUnit::MICRO), 
::arrow::default_memory_pool());
+
+  // Append identical records
+  for (int i = 0; i < count; i++) {
+    ICEBERG_ARROW_RETURN_NOT_OK(boolean_builder.Append(i != 0));
+    ICEBERG_ARROW_RETURN_NOT_OK(int_builder.Append(3 + i));
+    ICEBERG_ARROW_RETURN_NOT_OK(i == 0 ? long_builder.AppendNull()
+                                       : long_builder.Append(i));
+    ICEBERG_ARROW_RETURN_NOT_OK(
+        i == 0 ? float_builder.Append(std::numeric_limits<float>::quiet_NaN())
+               : float_builder.Append(1.0 + i));
+    ICEBERG_ARROW_RETURN_NOT_OK(
+        i == 0 ? 
double_builder.Append(std::numeric_limits<double>::quiet_NaN())
+               : double_builder.Append(1.0 + i));
+    ICEBERG_ARROW_RETURN_NOT_OK(i == 0
+                                    ? decimal_builder.AppendNull()
+                                    : 
decimal_builder.Append(::arrow::Decimal128("100") +
+                                                             i));  // 1.00 
with scale 2
+    ICEBERG_ARROW_RETURN_NOT_OK(string_builder.Append("AAA"));
+    ICEBERG_ARROW_RETURN_NOT_OK(date_builder.Append(1500 + i));
+    ICEBERG_ARROW_RETURN_NOT_OK(time_builder.Append(2000 + i));
+    ICEBERG_ARROW_RETURN_NOT_OK(timestamp_above_builder.Append(i + 1));
+    ICEBERG_ARROW_RETURN_NOT_OK(fixed_builder.Append("abcd"));
+    ICEBERG_ARROW_RETURN_NOT_OK(binary_builder.Append("S"));
+    ICEBERG_ARROW_RETURN_NOT_OK(timestamp_below_builder.Append((i + 1) * -1));
+  }
+
+  auto boolean_array = boolean_builder.Finish().ValueOrDie();
+  auto int_array = int_builder.Finish().ValueOrDie();
+  auto long_array = long_builder.Finish().ValueOrDie();
+  auto float_array = float_builder.Finish().ValueOrDie();
+  auto double_array = double_builder.Finish().ValueOrDie();
+  auto decimal_array = decimal_builder.Finish().ValueOrDie();
+  auto string_array = string_builder.Finish().ValueOrDie();
+  auto date_array = date_builder.Finish().ValueOrDie();
+  auto time_array = time_builder.Finish().ValueOrDie();
+  auto timestamp_above_array = timestamp_above_builder.Finish().ValueOrDie();
+  auto fixed_array = fixed_builder.Finish().ValueOrDie();
+  auto binary_array = binary_builder.Finish().ValueOrDie();
+  auto timestamp_below_array = timestamp_below_builder.Finish().ValueOrDie();
+
+  std::vector<std::shared_ptr<::arrow::Array>> field_arrays = {
+      boolean_array,         int_array,    long_array,
+      float_array,           double_array, decimal_array,
+      string_array,          date_array,   time_array,
+      timestamp_above_array, fixed_array,  binary_array,
+      timestamp_below_array};
+  return ::arrow::StructArray::Make(field_arrays, 
arrow_schema->fields()).ValueOrDie();
+}
+
+Result<std::shared_ptr<::arrow::Array>> MetricsTestBase::BuildNestedRecords(
+    int32_t count) {
+  auto leaf_struct_type = ::arrow::struct_({
+      ::arrow::field("leafLongCol", ::arrow::int64(), true),
+      ::arrow::field("leafBinaryCol", ::arrow::binary(), true),
+  });
+
+  auto nested_struct_type = ::arrow::struct_({
+      ::arrow::field("longCol", ::arrow::int64(), false),
+      ::arrow::field("leafStructCol", leaf_struct_type, false),
+      ::arrow::field("doubleCol", ::arrow::float64(), false),
+  });
+
+  auto arrow_schema = ::arrow::schema({
+      ::arrow::field("intCol", ::arrow::int32(), false),
+      ::arrow::field("nestedStructCol", nested_struct_type, false),
+  });
+
+  // Build leaf struct: {leafLongCol: 20, leafBinaryCol: "A"}
+  ::arrow::Int64Builder leaf_long_builder;
+  ::arrow::BinaryBuilder leaf_binary_builder;
+  ::arrow::Int64Builder nested_long_builder;
+  ::arrow::DoubleBuilder nested_double_builder;
+  ::arrow::Int32Builder int_builder;
+
+  for (int32_t i = 0; i < count; i++) {
+    ICEBERG_ARROW_RETURN_NOT_OK(leaf_long_builder.Append(20 + i));
+    ICEBERG_ARROW_RETURN_NOT_OK(leaf_binary_builder.Append("A"));
+
+    // Build nested struct: {longCol: 100, leafStructCol: {...}, doubleCol: 
NaN}
+
+    ICEBERG_ARROW_RETURN_NOT_OK(nested_long_builder.Append(100 + i));
+    ICEBERG_ARROW_RETURN_NOT_OK(
+        
nested_double_builder.Append(std::numeric_limits<double>::quiet_NaN()));
+
+    // Build top-level struct: {intCol: 2147483647, nestedStructCol: {...}}
+    ICEBERG_ARROW_RETURN_NOT_OK(
+        int_builder.Append(std::numeric_limits<int32_t>::min() + i));
+  }
+
+  ICEBERG_ARROW_ASSIGN_OR_RETURN(auto leaf_long_array, 
leaf_long_builder.Finish());
+  ICEBERG_ARROW_ASSIGN_OR_RETURN(auto leaf_binary_array, 
leaf_binary_builder.Finish());
+  ICEBERG_ARROW_ASSIGN_OR_RETURN(
+      auto leaf_struct_array,
+      ::arrow::StructArray::Make({leaf_long_array, leaf_binary_array},
+                                 leaf_struct_type->fields()));
+
+  // Build nested struct: {longCol: 100, leafStructCol: {...}, doubleCol: NaN}
+  ICEBERG_ARROW_ASSIGN_OR_RETURN(auto nested_long_array, 
nested_long_builder.Finish());
+  ICEBERG_ARROW_ASSIGN_OR_RETURN(auto nested_double_array,
+                                 nested_double_builder.Finish());
+  ICEBERG_ARROW_ASSIGN_OR_RETURN(
+      auto nested_struct_array,
+      ::arrow::StructArray::Make(
+          {nested_long_array, leaf_struct_array, nested_double_array},
+          nested_struct_type->fields()));
+
+  // Build top-level struct: {intCol: 2147483647, nestedStructCol: {...}}
+  ICEBERG_ARROW_ASSIGN_OR_RETURN(auto int_array, int_builder.Finish());
+  ICEBERG_ARROW_ASSIGN_OR_RETURN(
+      auto records, ::arrow::StructArray::Make({int_array, 
nested_struct_array},
+                                               arrow_schema->fields()));
+  return records;
+}
+
+}  // namespace iceberg::test
diff --git a/src/iceberg/test/metrics_test_base.h 
b/src/iceberg/test/metrics_test_base.h
new file mode 100644
index 00000000..1ad9c882
--- /dev/null
+++ b/src/iceberg/test/metrics_test_base.h
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <optional>
+#include <string>
+
+#include <arrow/array.h>
+
+#include "iceberg/file_io.h"
+#include "iceberg/metrics.h"
+#include "iceberg/metrics_config.h"
+#include "iceberg/result.h"
+#include "iceberg/schema.h"
+
+namespace iceberg::test {
+
+/// \brief Base test class for metrics testing, similar to Java's TestMetrics
+///
+/// This class provides common test infrastructure and helper methods for 
testing
+/// metrics collection across different file formats (Parquet, Avro, ORC).
+class MetricsTestBase {
+ protected:
+  virtual void SetUp();
+
+  /// \brief Get metrics for the given schema and records
+  virtual Result<Metrics> GetMetrics(std::shared_ptr<Schema> schema,
+                                     std::shared_ptr<::arrow::Array> records) 
= 0;
+
+  /// \brief Get metrics with custom MetricsConfig
+  virtual Result<Metrics> GetMetrics(std::shared_ptr<Schema> schema,
+                                     std::shared_ptr<MetricsConfig> config,
+                                     std::shared_ptr<::arrow::Array> records) 
= 0;
+
+  /// \brief Create an output file for testing
+  virtual std::string CreateOutputFile() = 0;
+
+  /// \brief Get the number of row groups/splits in a file
+  virtual Result<int> GetSplitCount() = 0;
+
+  /// \brief Whether the format supports small row groups for testing
+  virtual bool SupportsSmallRowGroups() const { return false; }
+
+  // Helper methods for assertions
+  void AssertCounts(int field_id, std::optional<int64_t> expected_value_count,
+                    std::optional<int64_t> expected_null_count, const Metrics& 
metrics);
+
+  void AssertCounts(int field_id, std::optional<int64_t> expected_value_count,
+                    std::optional<int64_t> expected_null_count,
+                    std::optional<int64_t> expected_nan_count, const Metrics& 
metrics);
+
+  template <typename T>
+  void AssertBounds(int field_id, std::shared_ptr<PrimitiveType> type,
+                    std::optional<T> expected_lower, std::optional<T> 
expected_upper,
+                    const Metrics& metrics);
+
+  // Helper methods for creating test data
+  std::shared_ptr<::arrow::Array> CreateRecordArrays(
+      const std::shared_ptr<::arrow::Schema>& arrow_schema, const std::string& 
json_data);
+
+  // Common test schemas
+  static std::shared_ptr<Schema> SimpleSchema();
+  static std::shared_ptr<Schema> NestedSchema();
+  static std::shared_ptr<Schema> FloatDoubleSchema();
+
+  // Test case methods - subclasses should call these from TEST_F macros
+  void MetricsForRepeatedValues();
+  void MetricsForTopLevelFields();
+  void MetricsForDecimals();
+  void MetricsForNestedStructFields();
+  void MetricsModeForNestedStructFields();
+  void MetricsForListAndMapElements();
+  void MetricsForNullColumns();
+  void MetricsForNaNColumns();
+  void ColumnBoundsWithNaNValueAtFront();
+  void ColumnBoundsWithNaNValueInMiddle();
+  void ColumnBoundsWithNaNValueAtEnd();
+  void MetricsForTopLevelWithMultipleRowGroup();
+  void MetricsForNestedStructFieldsWithMultipleRowGroup();
+  void NoneMetricsMode();
+  void CountsMetricsMode();
+  void FullMetricsMode();
+  void TruncateStringMetricsMode();
+  void TruncateBinaryMetricsMode();
+
+ private:
+  Result<std::shared_ptr<::arrow::Array>> BuildSimpleRecords(
+      std::shared_ptr<::arrow::Schema> arrow_schema, int32_t count = 1);
+  Result<std::shared_ptr<::arrow::Array>> BuildNestedRecords(int32_t count = 
1);
+
+ protected:
+  std::shared_ptr<FileIO> file_io_;
+  std::string temp_dir_;
+  std::string path_;
+};
+
+#define DEFINE_METRICS_TEST_CASE(TestClass, Case) \
+  TEST_F(TestClass, Case) { Case(); }
+
+#define DEFINE_METRICS_TESTS(TestClass)                                       \
+  DEFINE_METRICS_TEST_CASE(TestClass, MetricsForRepeatedValues)               \
+  DEFINE_METRICS_TEST_CASE(TestClass, MetricsForTopLevelFields)               \
+  DEFINE_METRICS_TEST_CASE(TestClass, MetricsForNestedStructFields)           \
+  DEFINE_METRICS_TEST_CASE(TestClass, MetricsForNullColumns)                  \
+  DEFINE_METRICS_TEST_CASE(TestClass, MetricsForNaNColumns)                   \
+  DEFINE_METRICS_TEST_CASE(TestClass, ColumnBoundsWithNaNValueAtFront)        \
+  DEFINE_METRICS_TEST_CASE(TestClass, ColumnBoundsWithNaNValueInMiddle)       \
+  DEFINE_METRICS_TEST_CASE(TestClass, ColumnBoundsWithNaNValueAtEnd)          \
+  DEFINE_METRICS_TEST_CASE(TestClass, MetricsForDecimals)                     \
+  DEFINE_METRICS_TEST_CASE(TestClass, MetricsForListAndMapElements)           \
+  DEFINE_METRICS_TEST_CASE(TestClass, MetricsModeForNestedStructFields)       \
+  DEFINE_METRICS_TEST_CASE(TestClass, NoneMetricsMode)                        \
+  DEFINE_METRICS_TEST_CASE(TestClass, CountsMetricsMode)                      \
+  DEFINE_METRICS_TEST_CASE(TestClass, FullMetricsMode)                        \
+  DEFINE_METRICS_TEST_CASE(TestClass, TruncateStringMetricsMode)              \
+  DEFINE_METRICS_TEST_CASE(TestClass, TruncateBinaryMetricsMode)              \
+  DEFINE_METRICS_TEST_CASE(TestClass, MetricsForTopLevelWithMultipleRowGroup) \
+  DEFINE_METRICS_TEST_CASE(TestClass, 
MetricsForNestedStructFieldsWithMultipleRowGroup)
+
+}  // namespace iceberg::test
diff --git a/src/iceberg/test/parquet_metrics_test.cc 
b/src/iceberg/test/parquet_metrics_test.cc
new file mode 100644
index 00000000..93c024b0
--- /dev/null
+++ b/src/iceberg/test/parquet_metrics_test.cc
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <arrow/c/bridge.h>
+#include <arrow/filesystem/filesystem.h>
+#include <gtest/gtest.h>
+#include <parquet/arrow/reader.h>
+#include <parquet/arrow/writer.h>
+#include <parquet/metadata.h>
+
+#include "iceberg/arrow/arrow_io_internal.h"
+#include "iceberg/arrow/arrow_status_internal.h"
+#include "iceberg/file_writer.h"
+#include "iceberg/parquet/parquet_register.h"
+#include "iceberg/test/metrics_test_base.h"
+#include "iceberg/util/checked_cast.h"
+
+namespace iceberg::test {
+
+class ParquetMetricsTest : public MetricsTestBase, public ::testing::Test {
+ protected:
+  static void SetUpTestSuite() { parquet::RegisterAll(); }
+
+  void SetUp() override {
+    MetricsTestBase::SetUp();
+    temp_parquet_file_ = "parquet_metrics_test.parquet";
+    writer_properties_ = WriterProperties::FromMap(
+        {{WriterProperties::kParquetCompression.key(), "uncompressed"}});
+  }
+
+  Result<Metrics> GetMetrics(std::shared_ptr<Schema> schema,
+                             std::shared_ptr<::arrow::Array> records) override 
{
+    return GetMetrics(schema, MetricsConfig::Default(), records);
+  }
+
+  Result<Metrics> GetMetrics(std::shared_ptr<Schema> schema,
+                             std::shared_ptr<MetricsConfig> config,
+                             std::shared_ptr<::arrow::Array> records) override 
{
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto writer, WriterFactoryRegistry::Open(FileFormatType::kParquet,
+                                                 {.path = temp_parquet_file_,
+                                                  .schema = schema,
+                                                  .io = file_io_,
+                                                  .metadata = {},
+                                                  .metrics_config = config,
+                                                  .properties = 
writer_properties_}));
+    ArrowArray arr;
+    ICEBERG_ARROW_RETURN_NOT_OK(::arrow::ExportArray(*records, &arr));
+    ICEBERG_RETURN_UNEXPECTED(writer->Write(&arr));
+    ICEBERG_RETURN_UNEXPECTED(writer->Close());
+    return writer->metrics();
+  }
+
+  std::string CreateOutputFile() override { return temp_parquet_file_; }
+
+  Result<int> GetSplitCount() override {
+    auto io = internal::checked_cast<arrow::ArrowFileSystemFileIO&>(*file_io_);
+    auto infile = io.fs()->OpenInputFile(temp_parquet_file_).ValueOrDie();
+    auto metadata = ::parquet::ReadMetaData(infile);
+    return metadata->num_row_groups();
+  }
+
+  bool SupportsSmallRowGroups() const override { return false; }
+
+ private:
+  std::string temp_parquet_file_;
+  WriterProperties writer_properties_;
+};
+
+DEFINE_METRICS_TESTS(ParquetMetricsTest);
+
+}  // namespace iceberg::test
diff --git a/src/iceberg/test/truncate_util_test.cc 
b/src/iceberg/test/truncate_util_test.cc
index 849f67d7..f39e10ca 100644
--- a/src/iceberg/test/truncate_util_test.cc
+++ b/src/iceberg/test/truncate_util_test.cc
@@ -79,8 +79,9 @@ TEST(TruncateUtilTest, TruncateBinaryMax) {
   EXPECT_EQ(result3, Literal::Binary(test3));
 
   // Test3b: cannot truncate when first bytes are all 0xFF
-  EXPECT_THAT(TruncateUtils::TruncateLiteralMax(Literal::Binary(test3), 2),
-              IsError(ErrorKind::kInvalidArgument));
+  ICEBERG_UNWRAP_OR_FAIL(auto result3b,
+                         
TruncateUtils::TruncateLiteralMax(Literal::Binary(test3), 2));
+  EXPECT_EQ(result3b, std::nullopt);
 
   // Test4: truncate {1, 1, 0} to 2 bytes -> {1, 2}
   ICEBERG_UNWRAP_OR_FAIL(auto result4,
@@ -143,8 +144,9 @@ TEST(TruncateUtilTest, TruncateStringMax) {
 
   // Test5: Max 4-byte UTF-8 characters "\uDBFF\uDFFF\uDBFF\uDFFF"
   std::string test5 = "\xF4\x8F\xBF\xBF\xF4\x8F\xBF\xBF";  // U+10FFFF U+10FFFF
-  EXPECT_THAT(TruncateUtils::TruncateLiteralMax(Literal::String(test5), 1),
-              IsError(ErrorKind::kInvalidArgument));
+  ICEBERG_UNWRAP_OR_FAIL(auto result5_1,
+                         
TruncateUtils::TruncateLiteralMax(Literal::String(test5), 1));
+  EXPECT_EQ(result5_1, std::nullopt);
 
   // Test6: 4-byte UTF-8 character "\uD800\uDFFF\uD800\uDFFF"
   std::string test6 = "\xF0\x90\x8F\xBF\xF0\x90\x8F\xBF";  // U+103FF U+103FF
diff --git a/src/iceberg/util/truncate_util.cc 
b/src/iceberg/util/truncate_util.cc
index aba22d17..8a1d0724 100644
--- a/src/iceberg/util/truncate_util.cc
+++ b/src/iceberg/util/truncate_util.cc
@@ -24,6 +24,7 @@
 #include <utility>
 
 #include "iceberg/expression/literal.h"
+#include "iceberg/type.h"
 #include "iceberg/util/checked_cast.h"
 
 namespace iceberg {
@@ -167,23 +168,26 @@ Literal TruncateLiteralImpl<TypeId::kBinary>(const 
Literal& literal, int32_t wid
 }
 
 template <TypeId type_id>
-Result<Literal> TruncateLiteralMaxImpl(const Literal& literal, int32_t width) 
= delete;
+Result<std::optional<Literal>> TruncateLiteralMaxImpl(const Literal& literal,
+                                                      int32_t width) = delete;
 
 template <>
-Result<Literal> TruncateLiteralMaxImpl<TypeId::kString>(const Literal& literal,
-                                                        int32_t width) {
+Result<std::optional<Literal>> TruncateLiteralMaxImpl<TypeId::kString>(
+    const Literal& literal, int32_t width) {
   const auto& str = std::get<std::string>(literal.value());
-  ICEBERG_ASSIGN_OR_RAISE(std::string truncated,
-                          TruncateUtils::TruncateUTF8Max(str, width));
-  return Literal::String(std::move(truncated));
+  ICEBERG_ASSIGN_OR_RAISE(auto truncated, TruncateUtils::TruncateUTF8Max(str, 
width));
+  if (!truncated.has_value()) {
+    return std::nullopt;
+  }
+  return std::optional<Literal>(Literal::String(std::move(truncated.value())));
 }
 
 template <>
-Result<Literal> TruncateLiteralMaxImpl<TypeId::kBinary>(const Literal& literal,
-                                                        int32_t width) {
+Result<std::optional<Literal>> TruncateLiteralMaxImpl<TypeId::kBinary>(
+    const Literal& literal, int32_t width) {
   const auto& data = std::get<std::vector<uint8_t>>(literal.value());
   if (static_cast<int32_t>(data.size()) <= width) {
-    return literal;
+    return std::optional<Literal>(literal);
   }
 
   std::vector<uint8_t> truncated(data.begin(), data.begin() + width);
@@ -191,18 +195,19 @@ Result<Literal> 
TruncateLiteralMaxImpl<TypeId::kBinary>(const Literal& literal,
     if (*it < 0xFF) {
       ++(*it);
       truncated.resize(truncated.size() - std::distance(truncated.rbegin(), 
it));
-      return Literal::Binary(std::move(truncated));
+      return std::optional<Literal>(Literal::Binary(std::move(truncated)));
     }
   }
-  return InvalidArgument("Cannot truncate upper bound for binary: all bytes 
are 0xFF");
+  return std::nullopt;
 }
 
 }  // namespace
 
-Result<std::string> TruncateUtils::TruncateUTF8Max(const std::string& source, 
size_t L) {
+Result<std::optional<std::string>> TruncateUtils::TruncateUTF8Max(
+    const std::string& source, size_t L) {
   std::string truncated = TruncateUTF8(source, L);
   if (truncated == source) {
-    return truncated;
+    return std::optional<std::string>(std::move(truncated));
   }
 
   // Try incrementing code points from the end
@@ -231,13 +236,12 @@ Result<std::string> TruncateUtils::TruncateUTF8Max(const 
std::string& source, si
       if (next_code_point <= kUtf8MaxCodePoint) {
         truncated.resize(cp_start);
         AppendUtf8CodePoint(next_code_point, truncated);
-        return truncated;
+        return std::optional<std::string>(std::move(truncated));
       }
     }
     last_cp_start = cp_start;
   }
-  return InvalidArgument(
-      "Cannot truncate upper bound for string: all code points are 0x10FFFF");
+  return std::nullopt;
 }
 
 Decimal TruncateUtils::TruncateDecimal(const Decimal& decimal, int32_t width) {
@@ -274,10 +278,11 @@ Result<Literal> TruncateUtils::TruncateLiteral(const 
Literal& literal, int32_t w
   case TYPE_ID:                                \
     return TruncateLiteralMaxImpl<TYPE_ID>(literal, width);
 
-Result<Literal> TruncateUtils::TruncateLiteralMax(const Literal& literal, 
int32_t width) {
+Result<std::optional<Literal>> TruncateUtils::TruncateLiteralMax(const 
Literal& literal,
+                                                                 int32_t 
width) {
   if (literal.IsNull()) [[unlikely]] {
     // Return null as is
-    return literal;
+    return std::optional<Literal>(literal);
   }
 
   if (literal.IsAboveMax() || literal.IsBelowMin()) [[unlikely]] {
@@ -293,4 +298,26 @@ Result<Literal> TruncateUtils::TruncateLiteralMax(const 
Literal& literal, int32_
   }
 }
 
+Result<Literal> TruncateUtils::TruncateLowerBound(const PrimitiveType& type,
+                                                  const Literal& value, 
int32_t length) {
+  switch (type.type_id()) {
+    case TypeId::kString:
+    case TypeId::kBinary:
+      return TruncateLiteral(value, length);
+    default:
+      return value;
+  }
+}
+
+Result<std::optional<Literal>> TruncateUtils::TruncateUpperBound(
+    const PrimitiveType& type, const Literal& value, int32_t length) {
+  switch (type.type_id()) {
+    case TypeId::kString:
+    case TypeId::kBinary:
+      return TruncateLiteralMax(value, length);
+    default:
+      return std::optional<Literal>(value);
+  }
+}
+
 }  // namespace iceberg
diff --git a/src/iceberg/util/truncate_util.h b/src/iceberg/util/truncate_util.h
index 1a1824a2..7fee86eb 100644
--- a/src/iceberg/util/truncate_util.h
+++ b/src/iceberg/util/truncate_util.h
@@ -20,6 +20,7 @@
 #pragma once
 
 #include <cstdint>
+#include <optional>
 #include <string>
 #include <utility>
 
@@ -71,9 +72,10 @@ class ICEBERG_EXPORT TruncateUtils {
   /// \param source The input string to truncate.
   /// \param L The maximum number of code points allowed in the output string.
   /// \return A Result containing the original string (if no truncation is
-  /// needed), or the smallest string greater than the truncated prefix, or an
-  /// error if no such value exists or the input is invalid UTF-8.
-  static Result<std::string> TruncateUTF8Max(const std::string& source, size_t 
L);
+  /// needed), the smallest string greater than the truncated prefix, or 
nullopt if no
+  /// safe upper bound can be represented. Invalid UTF-8 is returned as an 
error.
+  static Result<std::optional<std::string>> TruncateUTF8Max(const std::string& 
source,
+                                                            size_t L);
 
   /// \brief Truncate an integer v, either int32_t or int64_t, to v - (v % W).
   ///
@@ -109,10 +111,37 @@ class ICEBERG_EXPORT TruncateUtils {
   ///
   /// \param value The input Literal maximum value to truncate.
   /// \param width The width to truncate to.
-  /// \return A Result containing either the original Literal (if no 
truncation is needed)
-  /// or the smallest Literal greater than the truncated prefix, or an error 
if no such
-  /// value exists or cannot be represented.
-  static Result<Literal> TruncateLiteralMax(const Literal& value, int32_t 
width);
+  /// \return A Result containing either the original Literal (if no 
truncation is
+  /// needed), the smallest Literal greater than the truncated prefix, or 
nullopt if no
+  /// safe upper bound can be represented.
+  static Result<std::optional<Literal>> TruncateLiteralMax(const Literal& 
value,
+                                                           int32_t width);
+
+  /// \brief Truncate the lower bound of a string or binary value.
+  ///
+  /// For string/binary types, truncates to the given length. For other types, 
returns the
+  /// value unchanged.
+  ///
+  /// \param type The Iceberg primitive type.
+  /// \param value The lower bound literal value.
+  /// \param width The width to truncate to.
+  /// \return The truncated lower bound literal.
+  static Result<Literal> TruncateLowerBound(const PrimitiveType& type,
+                                            const Literal& value, int32_t 
width);
+
+  /// \brief Truncate the upper bound of a string or binary value.
+  ///
+  /// For string/binary types, truncates to the smallest value greater than 
the truncated
+  /// prefix. For other types, returns the value unchanged.
+  ///
+  /// \param type The Iceberg primitive type.
+  /// \param value The upper bound literal value.
+  /// \param width The width to truncate to.
+  /// \return The truncated upper bound literal, or nullopt if no safe upper 
bound can be
+  /// represented.
+  static Result<std::optional<Literal>> TruncateUpperBound(const 
PrimitiveType& type,
+                                                           const Literal& 
value,
+                                                           int32_t width);
 };
 
 }  // namespace iceberg


Reply via email to