This is an automated email from the ASF dual-hosted git repository.

wgtmac pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new bc48b949 fix(parquet): fix parquet writer metrics conversion (#681)
bc48b949 is described below

commit bc48b949dfca6c02dea75b7d50de1e2dfca6485a
Author: Gang Wu <[email protected]>
AuthorDate: Tue May 26 20:33:21 2026 +0800

    fix(parquet): fix parquet writer metrics conversion (#681)
    
    This is a followup of https://github.com/apache/iceberg-cpp/pull/651 to
    address all comments
---
 src/iceberg/file_writer.h                          |   3 +
 src/iceberg/metrics.h                              |   2 +-
 src/iceberg/metrics_config.cc                      |  27 +-
 src/iceberg/metrics_config.h                       |  10 +-
 src/iceberg/parquet/parquet_metrics.cc             | 272 ++++++++++++++++-----
 ...arquet_metrics.h => parquet_metrics_internal.h} |   5 +-
 src/iceberg/parquet/parquet_writer.cc              |  28 ++-
 src/iceberg/test/metrics_test_base.cc              |  47 ++--
 src/iceberg/test/metrics_test_base.h               |   4 +
 src/iceberg/test/parquet_metrics_test.cc           |  97 +++++++-
 src/iceberg/util/truncate_util.cc                  |  14 +-
 11 files changed, 387 insertions(+), 122 deletions(-)

diff --git a/src/iceberg/file_writer.h b/src/iceberg/file_writer.h
index 3c890453..4b1045a4 100644
--- a/src/iceberg/file_writer.h
+++ b/src/iceberg/file_writer.h
@@ -60,6 +60,9 @@ class ICEBERG_EXPORT WriterProperties : public 
ConfigBase<WriterProperties> {
                                                        "zstd"};
   inline static Entry<std::string> kParquetCompressionLevel{
       "write.parquet.compression-level", ""};
+  /// \brief Maximum number of rows in each Parquet row group.
+  inline static Entry<int64_t> 
kParquetMaxRowGroupRows{"write.parquet.max-row-group-rows",
+                                                       1024 * 1024};
 
   /// TODO(gangwu): add table properties with write.avro|parquet|orc.*
 
diff --git a/src/iceberg/metrics.h b/src/iceberg/metrics.h
index 083cd041..8cfbbdf8 100644
--- a/src/iceberg/metrics.h
+++ b/src/iceberg/metrics.h
@@ -36,7 +36,7 @@ namespace iceberg {
 /// lower/upper bounds for a specific field identified by its field_id.
 struct ICEBERG_EXPORT FieldMetrics {
   /// \brief The field ID this metrics belongs to.
-  int32_t field_id;
+  int32_t field_id = -1;
 
   /// \brief The total number of values (including nulls) for this field.
   /// A negative value indicates the count is unknown.
diff --git a/src/iceberg/metrics_config.cc b/src/iceberg/metrics_config.cc
index ea20d47e..95d1a1fe 100644
--- a/src/iceberg/metrics_config.cc
+++ b/src/iceberg/metrics_config.cc
@@ -125,9 +125,8 @@ const std::shared_ptr<MetricsConfig>& 
MetricsConfig::Default() {
 
 Result<std::shared_ptr<MetricsConfig>> MetricsConfig::Make(const Table& table) 
{
   ICEBERG_ASSIGN_OR_RAISE(auto schema, table.schema());
-  auto sort_order = table.sort_order();
-  return MakeInternal(table.properties(), *schema,
-                      *sort_order.value_or(SortOrder::Unsorted()));
+  auto order = table.sort_order().value_or(SortOrder::Unsorted());
+  return MakeInternal(table.properties(), schema.get(), order.get());
 }
 
 Result<std::shared_ptr<MetricsConfig>> MetricsConfig::Make(
@@ -135,11 +134,11 @@ Result<std::shared_ptr<MetricsConfig>> 
MetricsConfig::Make(
   // Create a minimal TableProperties wrapper for the properties
   TableProperties props = TableProperties::FromMap(std::move(properties));
 
-  return MakeInternal(props, Schema({}), *SortOrder::Unsorted());
+  return MakeInternal(props, /*schema=*/nullptr, /*order=*/nullptr);
 }
 
 Result<std::shared_ptr<MetricsConfig>> MetricsConfig::MakeInternal(
-    const TableProperties& props, const Schema& schema, const SortOrder& 
order) {
+    const TableProperties& props, const Schema* schema, const SortOrder* 
order) {
   ColumnModeMap column_modes;
 
   MetricsMode default_mode = kDefaultMetricsMode;
@@ -148,16 +147,16 @@ Result<std::shared_ptr<MetricsConfig>> 
MetricsConfig::MakeInternal(
         props.Get(TableProperties::kDefaultWriteMetricsMode);
     ICEBERG_ASSIGN_OR_RAISE(default_mode,
                             ParseMode(configured_metrics_mode, 
kDefaultMetricsMode));
-  } else {
+  } else if (schema != nullptr) {
     int32_t max_inferred_columns = MaxInferredColumns(props);
     GetProjectedIdsVisitor visitor(/*include_struct_ids=*/true);
-    ICEBERG_RETURN_UNEXPECTED(visitor.Visit(schema));
+    ICEBERG_RETURN_UNEXPECTED(visitor.Visit(*schema));
     auto projected_columns = static_cast<int32_t>(visitor.Finish().size());
     if (max_inferred_columns < projected_columns) {
       ICEBERG_ASSIGN_OR_RAISE(auto limit_field_ids,
-                              LimitFieldIds(schema, max_inferred_columns));
+                              LimitFieldIds(*schema, max_inferred_columns));
       for (auto id : limit_field_ids) {
-        ICEBERG_ASSIGN_OR_RAISE(auto column_name, 
schema.FindColumnNameById(id));
+        ICEBERG_ASSIGN_OR_RAISE(auto column_name, 
schema->FindColumnNameById(id));
         ICEBERG_CHECK(column_name.has_value(), "Field id {} not found in 
schema", id);
         column_modes[std::string(column_name.value())] = kDefaultMetricsMode;
       }
@@ -167,10 +166,12 @@ Result<std::shared_ptr<MetricsConfig>> 
MetricsConfig::MakeInternal(
   }
 
   // First set sorted column with sorted column default (can be overridden by 
user)
-  auto sorted_col_default_mode = SortedColumnDefaultMode(default_mode);
-  auto sorted_columns = SortOrder::OrderPreservingSortedColumns(schema, order);
-  for (const auto& sorted_column : sorted_columns) {
-    column_modes[std::string(sorted_column)] = sorted_col_default_mode;
+  if (schema != nullptr && order != nullptr) {
+    auto sorted_col_default_mode = SortedColumnDefaultMode(default_mode);
+    auto sorted_columns = SortOrder::OrderPreservingSortedColumns(*schema, 
*order);
+    for (const auto& sorted_column : sorted_columns) {
+      column_modes[std::string(sorted_column)] = sorted_col_default_mode;
+    }
   }
 
   // Handle user overrides of defaults
diff --git a/src/iceberg/metrics_config.h b/src/iceberg/metrics_config.h
index a5e51ee6..bba7307d 100644
--- a/src/iceberg/metrics_config.h
+++ b/src/iceberg/metrics_config.h
@@ -101,12 +101,14 @@ class ICEBERG_EXPORT MetricsConfig {
   ///
   /// \param props will be read for metrics overrides 
(write.metadata.metrics.column.*)
   /// and default(write.metadata.metrics.default)
-  /// \param schema table schema
-  /// \param order sort order columns, will be promoted to truncate(16)
+  /// \param schema table schema, or nullptr when only properties are available
+  /// \param order table sort order, or nullptr when unavailable. If provided, 
sorted
+  /// columns use at least the default truncate metrics mode (`truncate(16)`) 
when
+  /// the default mode is `none` or `counts`; explicit column overrides still 
win.
   /// \return metrics configuration
   static Result<std::shared_ptr<MetricsConfig>> MakeInternal(const 
TableProperties& props,
-                                                             const Schema& 
schema,
-                                                             const SortOrder& 
order);
+                                                             const Schema* 
schema,
+                                                             const SortOrder* 
order);
 
   ColumnModeMap column_modes_;
   MetricsMode default_mode_;
diff --git a/src/iceberg/parquet/parquet_metrics.cc 
b/src/iceberg/parquet/parquet_metrics.cc
index 09b727a1..32dc9fec 100644
--- a/src/iceberg/parquet/parquet_metrics.cc
+++ b/src/iceberg/parquet/parquet_metrics.cc
@@ -17,13 +17,15 @@
  * under the License.
  */
 
-#include "iceberg/parquet/parquet_metrics.h"
-
+#include <cstdint>
 #include <limits>
 #include <optional>
 #include <ranges>
+#include <span>
 #include <string>
 #include <unordered_map>
+#include <utility>
+#include <vector>
 
 #include <parquet/column_reader.h>
 #include <parquet/schema.h>
@@ -31,12 +33,14 @@
 #include <parquet/types.h>
 
 #include "iceberg/expression/literal.h"
+#include "iceberg/parquet/parquet_metrics_internal.h"
 #include "iceberg/result.h"
 #include "iceberg/schema.h"
 #include "iceberg/type.h"
 #include "iceberg/util/checked_cast.h"
-#include "iceberg/util/conversions.h"
+#include "iceberg/util/decimal.h"
 #include "iceberg/util/truncate_util.h"
+#include "iceberg/util/uuid.h"
 #include "iceberg/util/visit_type.h"
 
 namespace iceberg::parquet {
@@ -67,6 +71,163 @@ std::optional<int32_t> FindColumnIndex(const 
::parquet::SchemaDescriptor& parque
   return it != columns.end() ? std::optional(*it) : std::nullopt;
 }
 
+int64_t CollectColumnSize(const ::parquet::FileMetaData& metadata, int32_t 
column_idx) {
+  int64_t size = 0;
+  for (int rg = 0; rg < metadata.num_row_groups(); ++rg) {
+    size += 
metadata.RowGroup(rg)->ColumnChunk(column_idx)->total_compressed_size();
+  }
+  return size;
+}
+
+template <typename StatsType, typename Converter>
+Result<Literal> TypedStatsLiteral(const ::parquet::Statistics& stats, bool 
is_min,
+                                  Converter&& converter) {
+  const auto& typed_stats = internal::checked_cast<const StatsType&>(stats);
+  return converter(is_min ? typed_stats.min() : typed_stats.max());
+}
+
+std::vector<uint8_t> BytesFromByteArray(const ::parquet::ByteArray& value) {
+  return std::vector<uint8_t>{value.ptr, value.ptr + value.len};
+}
+
+std::vector<uint8_t> BytesFromFLBA(const ::parquet::FixedLenByteArray& value,
+                                   int32_t length) {
+  return std::vector<uint8_t>{value.ptr, value.ptr + length};
+}
+
+Literal DecimalLiteral(int128_t value, const PrimitiveType& iceberg_type) {
+  const auto& decimal_type = internal::checked_cast<const 
DecimalType&>(iceberg_type);
+  return Literal::Decimal(value, decimal_type.precision(), 
decimal_type.scale());
+}
+
+Result<Literal> DecimalLiteralFromBytes(std::span<const uint8_t> bytes,
+                                        const PrimitiveType& iceberg_type) {
+  ICEBERG_ASSIGN_OR_RAISE(auto decimal,
+                          Decimal::FromBigEndian(bytes.data(), bytes.size()));
+  return DecimalLiteral(decimal.value(), iceberg_type);
+}
+
+Result<Literal> BinaryStatsLiteral(std::vector<uint8_t> bytes,
+                                   const PrimitiveType& iceberg_type) {
+  switch (iceberg_type.type_id()) {
+    case TypeId::kString:
+      return Literal::String(std::string(bytes.begin(), bytes.end()));
+    case TypeId::kBinary:
+      return Literal::Binary(std::move(bytes));
+    case TypeId::kFixed:
+      return Literal::Fixed(std::move(bytes));
+    case TypeId::kUuid: {
+      ICEBERG_ASSIGN_OR_RAISE(auto uuid, Uuid::FromBytes(bytes));
+      return Literal::UUID(std::move(uuid));
+    }
+    case TypeId::kDecimal:
+      return DecimalLiteralFromBytes(bytes, iceberg_type);
+    default:
+      return InvalidArgument(
+          "Cannot convert Parquet binary statistics to Iceberg type {}",
+          iceberg_type.ToString());
+  }
+}
+
+Result<Literal> Int32StatsLiteral(int32_t value, const PrimitiveType& 
iceberg_type) {
+  switch (iceberg_type.type_id()) {
+    case TypeId::kInt:
+      return Literal::Int(value);
+    case TypeId::kLong:
+      return Literal::Long(value);
+    case TypeId::kDate:
+      return Literal::Date(value);
+    case TypeId::kDecimal:
+      return DecimalLiteral(value, iceberg_type);
+    default:
+      return InvalidArgument("Cannot convert Parquet INT32 statistics to 
Iceberg type {}",
+                             iceberg_type.ToString());
+  }
+}
+
+Result<Literal> Int64StatsLiteral(int64_t value, const PrimitiveType& 
iceberg_type) {
+  switch (iceberg_type.type_id()) {
+    case TypeId::kLong:
+      return Literal::Long(value);
+    case TypeId::kTime:
+      return Literal::Time(value);
+    case TypeId::kTimestamp:
+      return Literal::Timestamp(value);
+    case TypeId::kTimestampTz:
+      return Literal::TimestampTz(value);
+    case TypeId::kTimestampNs:
+      return Literal::TimestampNs(value);
+    case TypeId::kTimestampTzNs:
+      return Literal::TimestampTzNs(value);
+    case TypeId::kDecimal:
+      return DecimalLiteral(value, iceberg_type);
+    default:
+      return InvalidArgument("Cannot convert Parquet INT64 statistics to 
Iceberg type {}",
+                             iceberg_type.ToString());
+  }
+}
+
+Result<Literal> FloatStatsLiteral(float value, const PrimitiveType& 
iceberg_type) {
+  switch (iceberg_type.type_id()) {
+    case TypeId::kFloat:
+      return Literal::Float(value);
+    case TypeId::kDouble:
+      return Literal::Double(value);
+    default:
+      return InvalidArgument("Cannot convert Parquet FLOAT statistics to 
Iceberg type {}",
+                             iceberg_type.ToString());
+  }
+}
+
+bool IsFloatingType(const PrimitiveType& type) {
+  return type.type_id() == TypeId::kFloat || type.type_id() == TypeId::kDouble;
+}
+
+bool NeedsBoundTruncation(const PrimitiveType& type) {
+  return type.type_id() == TypeId::kString || type.type_id() == 
TypeId::kBinary;
+}
+
+Result<Literal> StatsValueToLiteral(const ::parquet::ColumnDescriptor& column,
+                                    const PrimitiveType& iceberg_type,
+                                    const ::parquet::Statistics& stats, bool 
is_min) {
+  switch (column.physical_type()) {
+    case ::parquet::Type::BOOLEAN:
+      return TypedStatsLiteral<::parquet::BoolStatistics>(
+          stats, is_min, [](bool value) { return Literal::Boolean(value); });
+    case ::parquet::Type::INT32:
+      return TypedStatsLiteral<::parquet::Int32Statistics>(
+          stats, is_min,
+          [&](int32_t value) { return Int32StatsLiteral(value, iceberg_type); 
});
+    case ::parquet::Type::INT64:
+      return TypedStatsLiteral<::parquet::Int64Statistics>(
+          stats, is_min,
+          [&](int64_t value) { return Int64StatsLiteral(value, iceberg_type); 
});
+    case ::parquet::Type::FLOAT:
+      return TypedStatsLiteral<::parquet::FloatStatistics>(
+          stats, is_min,
+          [&](float value) { return FloatStatsLiteral(value, iceberg_type); });
+    case ::parquet::Type::DOUBLE:
+      return TypedStatsLiteral<::parquet::DoubleStatistics>(
+          stats, is_min, [](double value) { return Literal::Double(value); });
+    case ::parquet::Type::BYTE_ARRAY:
+      return TypedStatsLiteral<::parquet::ByteArrayStatistics>(
+          stats, is_min, [&](const ::parquet::ByteArray& value) {
+            return BinaryStatsLiteral(BytesFromByteArray(value), iceberg_type);
+          });
+    case ::parquet::Type::FIXED_LEN_BYTE_ARRAY:
+      return TypedStatsLiteral<::parquet::FLBAStatistics>(
+          stats, is_min, [&](const ::parquet::FixedLenByteArray& value) {
+            return BinaryStatsLiteral(BytesFromFLBA(value, 
column.type_length()),
+                                      iceberg_type);
+          });
+    case ::parquet::Type::INT96:
+    case ::parquet::Type::UNDEFINED:
+      return NotSupported("Cannot convert Parquet statistics for physical type 
{}",
+                          static_cast<int>(column.physical_type()));
+  }
+  std::unreachable();
+}
+
 /// \brief Collect counts (value count and null count) from footer statistics.
 /// \param field_id The Iceberg field ID.
 /// \param metadata The Parquet file metadata.
@@ -105,6 +266,7 @@ Result<std::optional<FieldMetrics>> CollectBounds(
     int32_t field_id, std::shared_ptr<PrimitiveType> iceberg_type,
     const ::parquet::FileMetaData& metadata, int32_t column_idx,
     int32_t truncate_length) {
+  auto column_desc = metadata.schema()->Column(column_idx);
   int64_t null_count = 0;
   int64_t value_count = 0;
   std::optional<Literal> lower_bound;
@@ -122,28 +284,24 @@ Result<std::optional<FieldMetrics>> CollectBounds(
     value_count += column_chunk->num_values();
 
     if (stats->HasMinMax()) {
-      auto min_bytes = stats->EncodeMin();
-      auto min_span = std::span<const uint8_t>(
-          reinterpret_cast<const uint8_t*>(min_bytes.data()), 
min_bytes.size());
       ICEBERG_ASSIGN_OR_RAISE(auto min_value,
-                              Conversions::FromBytes(iceberg_type, min_span));
+                              StatsValueToLiteral(*column_desc, *iceberg_type, 
*stats,
+                                                  /*is_min=*/true));
       if (!lower_bound.has_value() || min_value < lower_bound.value()) {
         lower_bound = std::move(min_value);
       }
 
-      auto max_bytes = stats->EncodeMax();
-      auto max_span = std::span<const uint8_t>(
-          reinterpret_cast<const uint8_t*>(max_bytes.data()), 
max_bytes.size());
       ICEBERG_ASSIGN_OR_RAISE(auto max_value,
-                              Conversions::FromBytes(iceberg_type, max_span));
+                              StatsValueToLiteral(*column_desc, *iceberg_type, 
*stats,
+                                                  /*is_min=*/false));
       if (!upper_bound.has_value() || max_value > upper_bound.value()) {
         upper_bound = std::move(max_value);
       }
     }
   }
 
-  if (!lower_bound.has_value() || !upper_bound.has_value() || 
lower_bound->IsNaN() ||
-      upper_bound->IsNaN()) {
+  if (!lower_bound.has_value() || !upper_bound.has_value() ||
+      (IsFloatingType(*iceberg_type) && (lower_bound->IsNaN() || 
upper_bound->IsNaN()))) {
     return FieldMetrics{
         .field_id = field_id,
         .value_count = value_count,
@@ -151,19 +309,21 @@ Result<std::optional<FieldMetrics>> CollectBounds(
     };
   }
 
-  ICEBERG_ASSIGN_OR_RAISE(auto truncated_lower,
-                          TruncateUtils::TruncateLowerBound(
-                              *iceberg_type, lower_bound.value(), 
truncate_length));
-  ICEBERG_ASSIGN_OR_RAISE(auto truncated_upper,
-                          TruncateUtils::TruncateUpperBound(
-                              *iceberg_type, upper_bound.value(), 
truncate_length));
+  if (NeedsBoundTruncation(*iceberg_type)) {
+    ICEBERG_ASSIGN_OR_RAISE(
+        lower_bound, TruncateUtils::TruncateLowerBound(*iceberg_type, 
lower_bound.value(),
+                                                       truncate_length));
+    ICEBERG_ASSIGN_OR_RAISE(
+        upper_bound, TruncateUtils::TruncateUpperBound(*iceberg_type, 
upper_bound.value(),
+                                                       truncate_length));
+  }
 
   return FieldMetrics{
       .field_id = field_id,
       .value_count = value_count,
       .null_value_count = null_count,
-      .lower_bound = std::move(truncated_lower),
-      .upper_bound = std::move(truncated_upper),
+      .lower_bound = std::move(lower_bound),
+      .upper_bound = std::move(upper_bound),
   };
 }
 
@@ -187,19 +347,27 @@ Result<std::optional<FieldMetrics>> 
MetricsFromFieldMetrics(
                       .null_value_count = fm.null_value_count,
                       .nan_value_count = fm.nan_value_count};
 
-  if (truncate_length > 0) {
-    if (fm.lower_bound.has_value()) {
-      ICEBERG_ASSIGN_OR_RAISE(
-          auto lower, TruncateUtils::TruncateLowerBound(
-                          *primitive_type, fm.lower_bound.value(), 
truncate_length));
-      result.lower_bound = std::move(lower);
-    }
-    if (fm.upper_bound.has_value()) {
-      ICEBERG_ASSIGN_OR_RAISE(
-          auto upper, TruncateUtils::TruncateUpperBound(
-                          *primitive_type, fm.upper_bound.value(), 
truncate_length));
-      result.upper_bound = std::move(upper);
-    }
+  if (truncate_length <= 0) {
+    return result;
+  }
+
+  if (!NeedsBoundTruncation(*primitive_type)) {
+    result.lower_bound = fm.lower_bound;
+    result.upper_bound = fm.upper_bound;
+    return result;
+  }
+
+  if (fm.lower_bound.has_value()) {
+    ICEBERG_ASSIGN_OR_RAISE(
+        result.lower_bound,
+        TruncateUtils::TruncateLowerBound(*primitive_type, 
fm.lower_bound.value(),
+                                          truncate_length));
+  }
+  if (fm.upper_bound.has_value()) {
+    ICEBERG_ASSIGN_OR_RAISE(
+        result.upper_bound,
+        TruncateUtils::TruncateUpperBound(*primitive_type, 
fm.upper_bound.value(),
+                                          truncate_length));
   }
 
   return result;
@@ -279,6 +447,10 @@ class CollectMetricsVisitor {
     int32_t truncate_length = mode.TruncateLength();
     const auto& primitive_type =
         internal::checked_pointer_cast<PrimitiveType>(field.type());
+    auto column_idx = FindColumnIndex(parquet_schema_, field_id);
+    if (column_idx.has_value()) {
+      metrics_.column_sizes[field_id] = CollectColumnSize(metadata_, 
column_idx.value());
+    }
 
     ICEBERG_ASSIGN_OR_RAISE(auto field_metrics,
                             MetricsFromFieldMetrics(field_id, field_metrics_,
@@ -330,36 +502,10 @@ Result<Metrics> ParquetMetrics::GetMetrics(
     const std::unordered_map<int32_t, FieldMetrics>& field_metrics) {
   Metrics metrics;
 
-  // Collect row count and column sizes
-  int64_t row_count = 0;
-  for (int rg = 0; rg < metadata.num_row_groups(); ++rg) {
-    auto row_group = metadata.RowGroup(rg);
-    row_count += row_group->num_rows();
-    for (int col = 0; col < row_group->num_columns(); ++col) {
-      auto column_chunk = row_group->ColumnChunk(col);
-      auto field_id_opt = GetFieldId(*parquet_schema.Column(col));
-      if (!field_id_opt.has_value()) {
-        continue;
-      }
-      int32_t field_id = field_id_opt.value();
-
-      ICEBERG_ASSIGN_OR_RAISE(auto field_name, 
schema.FindColumnNameById(field_id));
-      if (!field_name.has_value()) {
-        continue;
-      }
-
-      MetricsMode mode = metrics_config.ColumnMode(field_name.value());
-      if (mode.kind != MetricsMode::Kind::kNone) {
-        metrics.column_sizes[field_id] =
-            metrics.column_sizes.contains(field_id)
-                ? metrics.column_sizes[field_id] + 
column_chunk->total_compressed_size()
-                : column_chunk->total_compressed_size();
-      }
-    }
-  }
-  metrics.row_count = row_count;
+  metrics.row_count = metadata.num_rows();
 
-  // Collect metrics for all primitive fields
+  // Apply MetricsConfig while visiting schema fields, then collect footer 
metrics only
+  // for fields whose mode is not `none`.
   CollectMetricsVisitor visitor(parquet_schema, metrics_config, metadata, 
field_metrics,
                                 metrics);
   ICEBERG_RETURN_UNEXPECTED(visitor.VisitStruct(schema, ""));
diff --git a/src/iceberg/parquet/parquet_metrics.h 
b/src/iceberg/parquet/parquet_metrics_internal.h
similarity index 95%
rename from src/iceberg/parquet/parquet_metrics.h
rename to src/iceberg/parquet/parquet_metrics_internal.h
index eb916241..c78c114a 100644
--- a/src/iceberg/parquet/parquet_metrics.h
+++ b/src/iceberg/parquet/parquet_metrics_internal.h
@@ -19,14 +19,13 @@
 
 #pragma once
 
-/// \file iceberg/parquet/parquet_metrics.h
+/// \file iceberg/parquet/parquet_metrics_internal.h
 /// \brief Utilities for extracting metrics from Parquet files.
 
 #include <unordered_map>
 
 #include <parquet/metadata.h>
 
-#include "iceberg/iceberg_bundle_export.h"
 #include "iceberg/metrics.h"
 #include "iceberg/metrics_config.h"
 #include "iceberg/result.h"
@@ -35,7 +34,7 @@
 namespace iceberg::parquet {
 
 /// \brief Utility class for computing metrics from Parquet files.
-class ICEBERG_BUNDLE_EXPORT ParquetMetrics {
+class ParquetMetrics {
  public:
   ParquetMetrics() = delete;
 
diff --git a/src/iceberg/parquet/parquet_writer.cc 
b/src/iceberg/parquet/parquet_writer.cc
index e91a2a6c..da794cc3 100644
--- a/src/iceberg/parquet/parquet_writer.cc
+++ b/src/iceberg/parquet/parquet_writer.cc
@@ -21,6 +21,7 @@
 
 #include <memory>
 #include <string_view>
+#include <vector>
 
 #include <arrow/c/bridge.h>
 #include <arrow/record_batch.h>
@@ -33,7 +34,7 @@
 
 #include "iceberg/arrow/arrow_io_internal.h"
 #include "iceberg/arrow/arrow_status_internal.h"
-#include "iceberg/parquet/parquet_metrics.h"
+#include "iceberg/parquet/parquet_metrics_internal.h"
 #include "iceberg/schema_internal.h"
 #include "iceberg/util/macros.h"
 
@@ -94,6 +95,11 @@ class ParquetWriter::Impl {
 
     auto properties_builder = ::parquet::WriterProperties::Builder();
     properties_builder.compression(compression);
+    auto max_row_group_rows =
+        options.properties.Get(WriterProperties::kParquetMaxRowGroupRows);
+    ICEBERG_PRECHECK(max_row_group_rows > 0,
+                     "Parquet max row group rows must be greater than 0");
+    properties_builder.max_row_group_length(max_row_group_rows);
     if (compression_level.has_value()) {
       properties_builder.compression_level(compression_level.value());
     }
@@ -142,12 +148,11 @@ class ParquetWriter::Impl {
     }
 
     ICEBERG_ARROW_RETURN_NOT_OK(writer_->Close());
-    auto& metadata = writer_->metadata();
-    split_offsets_.reserve(metadata->num_row_groups());
-    for (int i = 0; i < metadata->num_row_groups(); ++i) {
-      split_offsets_.push_back(metadata->RowGroup(i)->file_offset());
-    }
     metadata_ = writer_->metadata();
+    split_offsets_.reserve(metadata_->num_row_groups());
+    for (int i = 0; i < metadata_->num_row_groups(); ++i) {
+      split_offsets_.push_back(metadata_->RowGroup(i)->file_offset());
+    }
     writer_.reset();
 
     ICEBERG_ARROW_ASSIGN_OR_RETURN(total_bytes_, output_stream_->Tell());
@@ -171,15 +176,12 @@ class ParquetWriter::Impl {
   std::vector<int64_t> split_offsets() const { return split_offsets_; }
 
   Result<Metrics> metrics() {
-    if (writer_) {
-      return Invalid("Cannot return metrics for unclosed writer");
-    }
-    if (!metadata_) {
-      return Metrics();
-    }
+    ICEBERG_PRECHECK(writer_ == nullptr, "Cannot return metrics for unclosed 
writer");
+    ICEBERG_PRECHECK(metadata_ != nullptr,
+                     "Cannot return metrics because Parquet metadata is not 
available");
     // TODO(WZhuo): collect write-side FieldMetrics to support NaN value 
counts.
     return ParquetMetrics::GetMetrics(*schema_, *parquet_schema_, 
*metrics_config_,
-                                      *metadata_, {});
+                                      *metadata_);
   }
 
  private:
diff --git a/src/iceberg/test/metrics_test_base.cc 
b/src/iceberg/test/metrics_test_base.cc
index cc5f7cd6..7913a720 100644
--- a/src/iceberg/test/metrics_test_base.cc
+++ b/src/iceberg/test/metrics_test_base.cc
@@ -22,6 +22,7 @@
 #include <arrow/builder.h>
 #include <arrow/c/bridge.h>
 #include <arrow/json/from_string.h>
+#include <gmock/gmock.h>
 
 #include "iceberg/arrow/arrow_io_internal.h"
 #include "iceberg/arrow/arrow_status_internal.h"
@@ -80,6 +81,17 @@ void MetricsTestBase::AssertCounts(int field_id,
   }
 }
 
+void MetricsTestBase::AssertColumnSizeFields(std::vector<int32_t> 
expected_field_ids,
+                                             const Metrics& metrics) {
+  std::vector<int32_t> actual_field_ids;
+  actual_field_ids.reserve(metrics.column_sizes.size());
+  for (const auto& [field_id, size] : metrics.column_sizes) {
+    EXPECT_GT(size, 0) << "Field " << field_id << " should have a positive 
size";
+    actual_field_ids.push_back(field_id);
+  }
+  EXPECT_THAT(actual_field_ids, 
testing::UnorderedElementsAreArray(expected_field_ids));
+}
+
 template <typename T>
 void MetricsTestBase::AssertBounds(int field_id, 
std::shared_ptr<PrimitiveType> type,
                                    std::optional<T> expected_lower,
@@ -91,6 +103,8 @@ void MetricsTestBase::AssertBounds(int field_id, 
std::shared_ptr<PrimitiveType>
     const auto& literal = metrics.lower_bounds.at(field_id);
     ASSERT_FALSE(literal.IsNull())
         << "Field " << field_id << " lower bound literal should not be null";
+    EXPECT_EQ(*literal.type(), *type)
+        << "Field " << field_id << " lower bound literal type mismatch";
     EXPECT_EQ(std::get<T>(literal.value()), expected_lower.value())
         << "Field " << field_id << " lower bound mismatch";
   } else {
@@ -103,6 +117,8 @@ void MetricsTestBase::AssertBounds(int field_id, 
std::shared_ptr<PrimitiveType>
     const auto& literal = metrics.upper_bounds.at(field_id);
     ASSERT_FALSE(literal.IsNull())
         << "Field " << field_id << " upper bound literal should not be null";
+    EXPECT_EQ(*literal.type(), *type)
+        << "Field " << field_id << " upper bound literal type mismatch";
     EXPECT_EQ(std::get<T>(literal.value()), expected_upper.value())
         << "Field " << field_id << " upper bound mismatch";
   } else {
@@ -208,12 +224,12 @@ void MetricsTestBase::MetricsForRepeatedValues() {
 
   ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
   EXPECT_EQ(*metrics.row_count, 2);
+  AssertColumnSizeFields({1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}, metrics);
 
   AssertCounts(1, 2, 0, metrics);
   AssertCounts(2, 2, 0, metrics);
   AssertCounts(3, 2, 1, metrics);
-  // TODO(WZhuo) Assert NaN metrics
-  AssertCounts(4, 2, 0, metrics);  // floatCol has 2 NaN values
+  AssertCounts(4, 2, 0, metrics);
   AssertCounts(5, 2, 0, metrics);
   AssertCounts(6, 2, 1, metrics);
   AssertCounts(7, 2, 0, metrics);
@@ -317,17 +333,13 @@ void MetricsTestBase::MetricsForDecimals() {
   EXPECT_EQ(*metrics.row_count, 1);
 
   AssertCounts(1, 1, 0, metrics);
-  // For decimals, bounds exist but we just check they're present
-  EXPECT_TRUE(metrics.lower_bounds.contains(1));
-  EXPECT_TRUE(metrics.upper_bounds.contains(1));
+  AssertBounds<Decimal>(1, decimal(4, 2), Decimal(255), Decimal(255), metrics);
 
   AssertCounts(2, 1, 0, metrics);
-  EXPECT_TRUE(metrics.lower_bounds.contains(2));
-  EXPECT_TRUE(metrics.upper_bounds.contains(2));
+  AssertBounds<Decimal>(2, decimal(14, 2), Decimal(475), Decimal(475), 
metrics);
 
   AssertCounts(3, 1, 0, metrics);
-  EXPECT_TRUE(metrics.lower_bounds.contains(3));
-  EXPECT_TRUE(metrics.upper_bounds.contains(3));
+  AssertBounds<Decimal>(3, decimal(22, 2), Decimal(580), Decimal(580), 
metrics);
 }
 
 void MetricsTestBase::MetricsForNestedStructFields() {
@@ -338,6 +350,7 @@ void MetricsTestBase::MetricsForNestedStructFields() {
 
   ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
   EXPECT_EQ(*metrics.row_count, 1);
+  AssertColumnSizeFields({1, 3, 5, 6, 7}, metrics);
 
   AssertCounts(1, 1, 0, metrics);
   AssertBounds<int32_t>(1, int32(), std::numeric_limits<int32_t>::min(),
@@ -353,7 +366,6 @@ void MetricsTestBase::MetricsForNestedStructFields() {
   AssertBounds<std::vector<uint8_t>>(6, binary(), std::vector<uint8_t>{'A'},
                                      std::vector<uint8_t>{'A'}, metrics);
 
-  // TODO(WZhuo) Assert NaN metrics
   AssertCounts(7, 1L, 0L, metrics);
   AssertBounds<double>(7, float64(), std::nullopt, std::nullopt, metrics);
 }
@@ -373,6 +385,7 @@ void MetricsTestBase::MetricsModeForNestedStructFields() {
 
   ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
   EXPECT_EQ(*metrics.row_count, 1);
+  AssertColumnSizeFields({3}, metrics);
 
   // Only field 3 (nestedStructCol.longCol) should have bounds
   EXPECT_EQ(metrics.lower_bounds.size(), 1);
@@ -460,6 +473,7 @@ void MetricsTestBase::MetricsForListAndMapElements() {
 
   ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
   EXPECT_EQ(*metrics.row_count, 1);
+  AssertColumnSizeFields({}, metrics);
 
   // For list and map elements, metrics should not be collected
   // Field IDs: 1 (leafIntCol), 2 (leafStringCol), 4 (list element), 6 (map 
key), 7 (map
@@ -468,6 +482,7 @@ void MetricsTestBase::MetricsForListAndMapElements() {
   AssertCounts(2, std::nullopt, std::nullopt, metrics);
   AssertCounts(4, std::nullopt, std::nullopt, metrics);
   AssertCounts(6, std::nullopt, std::nullopt, metrics);
+  AssertCounts(7, std::nullopt, std::nullopt, metrics);
 
   AssertBounds<int32_t>(1, int32(), std::nullopt, std::nullopt, metrics);
   AssertBounds<std::string>(2, string(), std::nullopt, std::nullopt, metrics);
@@ -526,7 +541,6 @@ void MetricsTestBase::MetricsForNaNColumns() {
 
   ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
   EXPECT_EQ(*metrics.row_count, 2);
-  // TODO(WZhuo) Assert NaN metrics
   AssertCounts(1, 2, 0, metrics);
   AssertCounts(2, 2, 0, metrics);
 
@@ -565,7 +579,6 @@ void MetricsTestBase::ColumnBoundsWithNaNValueAtFront() {
 
   ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
   EXPECT_EQ(*metrics.row_count, 3);
-  // TODO(WZhuo) Assert NaN metrics
   AssertCounts(1, 3, 0, metrics);
   AssertCounts(2, 3, 0, metrics);
 
@@ -664,6 +677,8 @@ void 
MetricsTestBase::MetricsForTopLevelWithMultipleRowGroup() {
   if (SupportsSmallRowGroups()) {
     ICEBERG_UNWRAP_OR_FAIL(auto split_count, GetSplitCount());
     EXPECT_EQ(split_count, 3);
+  } else {
+    FAIL() << "This test must force multiple row groups";
   }
 
   ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
@@ -694,6 +709,8 @@ void 
MetricsTestBase::MetricsForNestedStructFieldsWithMultipleRowGroup() {
   if (SupportsSmallRowGroups()) {
     ICEBERG_UNWRAP_OR_FAIL(auto split_count, GetSplitCount());
     EXPECT_EQ(split_count, 3);
+  } else {
+    FAIL() << "This test must force multiple row groups";
   }
   ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
   EXPECT_EQ(*metrics.row_count, 201);
@@ -732,7 +749,7 @@ void MetricsTestBase::NoneMetricsMode() {
   EXPECT_EQ(*metrics.row_count, 1);
 
   // In None mode, column_sizes should be empty
-  EXPECT_TRUE(metrics.column_sizes.empty());
+  AssertColumnSizeFields({}, metrics);
 
   // All counts should be null
   AssertCounts(1, std::nullopt, std::nullopt, metrics);
@@ -761,7 +778,7 @@ void MetricsTestBase::CountsMetricsMode() {
   EXPECT_EQ(*metrics.row_count, 1);
 
   // In Counts mode, column_sizes should not be empty
-  EXPECT_FALSE(metrics.column_sizes.empty());
+  AssertColumnSizeFields({1, 3, 5, 6, 7}, metrics);
 
   // Counts should be present but bounds should be null
   AssertCounts(1, 1, 0, metrics);
@@ -790,7 +807,7 @@ void MetricsTestBase::FullMetricsMode() {
   EXPECT_EQ(*metrics.row_count, 1);
 
   // In Full mode, column_sizes should not be empty
-  EXPECT_FALSE(metrics.column_sizes.empty());
+  AssertColumnSizeFields({1, 3, 5, 6, 7}, metrics);
 
   // Both counts and bounds should be present
   AssertCounts(1, 1, 0, metrics);
diff --git a/src/iceberg/test/metrics_test_base.h 
b/src/iceberg/test/metrics_test_base.h
index 1ad9c882..07b3b62f 100644
--- a/src/iceberg/test/metrics_test_base.h
+++ b/src/iceberg/test/metrics_test_base.h
@@ -22,6 +22,7 @@
 #include <memory>
 #include <optional>
 #include <string>
+#include <vector>
 
 #include <arrow/array.h>
 
@@ -67,6 +68,9 @@ class MetricsTestBase {
                     std::optional<int64_t> expected_null_count,
                     std::optional<int64_t> expected_nan_count, const Metrics& 
metrics);
 
+  void AssertColumnSizeFields(std::vector<int32_t> expected_field_ids,
+                              const Metrics& metrics);
+
   template <typename T>
   void AssertBounds(int field_id, std::shared_ptr<PrimitiveType> type,
                     std::optional<T> expected_lower, std::optional<T> 
expected_upper,
diff --git a/src/iceberg/test/parquet_metrics_test.cc 
b/src/iceberg/test/parquet_metrics_test.cc
index 93c024b0..8286efe0 100644
--- a/src/iceberg/test/parquet_metrics_test.cc
+++ b/src/iceberg/test/parquet_metrics_test.cc
@@ -17,6 +17,7 @@
  * under the License.
  */
 
+#include <arrow/builder.h>
 #include <arrow/c/bridge.h>
 #include <arrow/filesystem/filesystem.h>
 #include <gtest/gtest.h>
@@ -27,7 +28,9 @@
 #include "iceberg/arrow/arrow_io_internal.h"
 #include "iceberg/arrow/arrow_status_internal.h"
 #include "iceberg/file_writer.h"
+#include "iceberg/parquet/parquet_metrics_internal.h"
 #include "iceberg/parquet/parquet_register.h"
+#include "iceberg/test/matchers.h"
 #include "iceberg/test/metrics_test_base.h"
 #include "iceberg/util/checked_cast.h"
 
@@ -40,8 +43,10 @@ class ParquetMetricsTest : public MetricsTestBase, public 
::testing::Test {
   void SetUp() override {
     MetricsTestBase::SetUp();
     temp_parquet_file_ = "parquet_metrics_test.parquet";
-    writer_properties_ = WriterProperties::FromMap(
-        {{WriterProperties::kParquetCompression.key(), "uncompressed"}});
+    writer_properties_ = WriterProperties::FromMap({
+        {WriterProperties::kParquetCompression.key(), "uncompressed"},
+        {WriterProperties::kParquetMaxRowGroupRows.key(), "100"},
+    });
   }
 
   Result<Metrics> GetMetrics(std::shared_ptr<Schema> schema,
@@ -76,7 +81,31 @@ class ParquetMetricsTest : public MetricsTestBase, public 
::testing::Test {
     return metadata->num_row_groups();
   }
 
-  bool SupportsSmallRowGroups() const override { return false; }
+  Result<Metrics> GetMetricsWithFieldMetrics(
+      std::shared_ptr<Schema> schema, std::shared_ptr<MetricsConfig> config,
+      std::shared_ptr<::arrow::Array> records,
+      const std::unordered_map<int32_t, FieldMetrics>& field_metrics) {
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto writer, WriterFactoryRegistry::Open(FileFormatType::kParquet,
+                                                 {.path = temp_parquet_file_,
+                                                  .schema = schema,
+                                                  .io = file_io_,
+                                                  .metadata = {},
+                                                  .metrics_config = config,
+                                                  .properties = 
writer_properties_}));
+    ArrowArray arr;
+    ICEBERG_ARROW_RETURN_NOT_OK(::arrow::ExportArray(*records, &arr));
+    ICEBERG_RETURN_UNEXPECTED(writer->Write(&arr));
+    ICEBERG_RETURN_UNEXPECTED(writer->Close());
+
+    auto io = internal::checked_cast<arrow::ArrowFileSystemFileIO&>(*file_io_);
+    auto infile = io.fs()->OpenInputFile(temp_parquet_file_).ValueOrDie();
+    auto metadata = ::parquet::ReadMetaData(infile);
+    return parquet::ParquetMetrics::GetMetrics(*schema, *metadata->schema(), 
*config,
+                                               *metadata, field_metrics);
+  }
+
+  bool SupportsSmallRowGroups() const override { return true; }
 
  private:
   std::string temp_parquet_file_;
@@ -85,4 +114,66 @@ class ParquetMetricsTest : public MetricsTestBase, public 
::testing::Test {
 
 DEFINE_METRICS_TESTS(ParquetMetricsTest);
 
+TEST_F(ParquetMetricsTest, FieldMetricsOverrideFooterStats) {
+  auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
+      SchemaField::MakeOptional(1, "floatCol", float32()),
+      SchemaField::MakeOptional(2, "strCol", string()),
+  });
+  auto arrow_schema = ::arrow::schema({
+      ::arrow::field("floatCol", ::arrow::float32(), true),
+      ::arrow::field("strCol", ::arrow::utf8(), true),
+  });
+  auto records = CreateRecordArrays(arrow_schema, R"([
+    {"floatCol": 1.0, "strCol": "footer-value"}
+  ])");
+  std::unordered_map<std::string, std::string> properties = {
+      {"write.metadata.metrics.default", "truncate(3)"}};
+  ICEBERG_UNWRAP_OR_FAIL(auto config, MetricsConfig::Make(properties));
+
+  std::unordered_map<int32_t, FieldMetrics> field_metrics{
+      {1, FieldMetrics{.field_id = 1,
+                       .value_count = 10,
+                       .null_value_count = 2,
+                       .lower_bound = Literal::Float(5.0F),
+                       .upper_bound = Literal::Float(9.0F)}},
+      {2, FieldMetrics{.field_id = 2,
+                       .value_count = 10,
+                       .null_value_count = 1,
+                       .lower_bound = Literal::String("abcdef"),
+                       .upper_bound = Literal::String("abcxyz")}},
+  };
+
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto metrics, GetMetricsWithFieldMetrics(schema, config, records, 
field_metrics));
+
+  AssertCounts(1, 10, 2, metrics);
+  AssertBounds<float>(1, float32(), 5.0F, 9.0F, metrics);
+  AssertCounts(2, 10, 1, metrics);
+  AssertBounds<std::string>(2, string(), std::string("abc"), 
std::string("abd"), metrics);
+}
+
+TEST_F(ParquetMetricsTest, UnrepresentableTruncatedUpperBoundIsOmitted) {
+  auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
+      SchemaField::MakeRequired(1, "binCol", binary()),
+  });
+  auto arrow_schema = ::arrow::schema({
+      ::arrow::field("binCol", ::arrow::binary(), false),
+  });
+
+  ::arrow::BinaryBuilder builder;
+  std::vector<uint8_t> data = {0xFF, 0xFF, 0x01};
+  ASSERT_TRUE(builder.Append(data.data(), data.size()).ok());
+  auto array = builder.Finish().ValueOrDie();
+  auto records = ::arrow::StructArray::Make({array}, 
arrow_schema->fields()).ValueOrDie();
+  std::unordered_map<std::string, std::string> properties = {
+      {"write.metadata.metrics.default", "truncate(2)"}};
+  ICEBERG_UNWRAP_OR_FAIL(auto config, MetricsConfig::Make(properties));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, config, records));
+
+  AssertCounts(1, 1, 0, metrics);
+  AssertBounds<std::vector<uint8_t>>(1, binary(), std::vector<uint8_t>({0xFF, 
0xFF}),
+                                     std::nullopt, metrics);
+}
+
 }  // namespace iceberg::test
diff --git a/src/iceberg/util/truncate_util.cc 
b/src/iceberg/util/truncate_util.cc
index 8a1d0724..1778000f 100644
--- a/src/iceberg/util/truncate_util.cc
+++ b/src/iceberg/util/truncate_util.cc
@@ -179,7 +179,7 @@ Result<std::optional<Literal>> 
TruncateLiteralMaxImpl<TypeId::kString>(
   if (!truncated.has_value()) {
     return std::nullopt;
   }
-  return std::optional<Literal>(Literal::String(std::move(truncated.value())));
+  return Literal::String(std::move(*truncated));
 }
 
 template <>
@@ -187,7 +187,7 @@ Result<std::optional<Literal>> 
TruncateLiteralMaxImpl<TypeId::kBinary>(
     const Literal& literal, int32_t width) {
   const auto& data = std::get<std::vector<uint8_t>>(literal.value());
   if (static_cast<int32_t>(data.size()) <= width) {
-    return std::optional<Literal>(literal);
+    return literal;
   }
 
   std::vector<uint8_t> truncated(data.begin(), data.begin() + width);
@@ -195,7 +195,7 @@ Result<std::optional<Literal>> 
TruncateLiteralMaxImpl<TypeId::kBinary>(
     if (*it < 0xFF) {
       ++(*it);
       truncated.resize(truncated.size() - std::distance(truncated.rbegin(), 
it));
-      return std::optional<Literal>(Literal::Binary(std::move(truncated)));
+      return Literal::Binary(std::move(truncated));
     }
   }
   return std::nullopt;
@@ -207,7 +207,7 @@ Result<std::optional<std::string>> 
TruncateUtils::TruncateUTF8Max(
     const std::string& source, size_t L) {
   std::string truncated = TruncateUTF8(source, L);
   if (truncated == source) {
-    return std::optional<std::string>(std::move(truncated));
+    return truncated;
   }
 
   // Try incrementing code points from the end
@@ -236,7 +236,7 @@ Result<std::optional<std::string>> 
TruncateUtils::TruncateUTF8Max(
       if (next_code_point <= kUtf8MaxCodePoint) {
         truncated.resize(cp_start);
         AppendUtf8CodePoint(next_code_point, truncated);
-        return std::optional<std::string>(std::move(truncated));
+        return truncated;
       }
     }
     last_cp_start = cp_start;
@@ -282,7 +282,7 @@ Result<std::optional<Literal>> 
TruncateUtils::TruncateLiteralMax(const Literal&
                                                                  int32_t 
width) {
   if (literal.IsNull()) [[unlikely]] {
     // Return null as is
-    return std::optional<Literal>(literal);
+    return literal;
   }
 
   if (literal.IsAboveMax() || literal.IsBelowMin()) [[unlikely]] {
@@ -316,7 +316,7 @@ Result<std::optional<Literal>> 
TruncateUtils::TruncateUpperBound(
     case TypeId::kBinary:
       return TruncateLiteralMax(value, length);
     default:
-      return std::optional<Literal>(value);
+      return value;
   }
 }
 


Reply via email to