This is an automated email from the ASF dual-hosted git repository.
wgtmac pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 97ea8708 feat: metrics for parquet writer (#651)
97ea8708 is described below
commit 97ea8708d216a554b550d86510437fb91027cb20
Author: wzhuo <[email protected]>
AuthorDate: Tue May 26 12:55:45 2026 +0800
feat: metrics for parquet writer (#651)
---
src/iceberg/CMakeLists.txt | 1 +
src/iceberg/file_writer.h | 3 +
src/iceberg/metrics.h | 29 +
src/iceberg/metrics_config.cc | 22 +
src/iceberg/metrics_config.h | 11 +
src/iceberg/parquet/parquet_metrics.cc | 370 +++++++++++
src/iceberg/parquet/parquet_metrics.h | 64 ++
src/iceberg/parquet/parquet_writer.cc | 40 +-
src/iceberg/test/CMakeLists.txt | 2 +
src/iceberg/test/metrics_test_base.cc | 1020 ++++++++++++++++++++++++++++++
src/iceberg/test/metrics_test_base.h | 138 ++++
src/iceberg/test/parquet_metrics_test.cc | 88 +++
src/iceberg/test/truncate_util_test.cc | 10 +-
src/iceberg/util/truncate_util.cc | 63 +-
src/iceberg/util/truncate_util.h | 43 +-
15 files changed, 1865 insertions(+), 39 deletions(-)
diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index 2b02b999..672cf54a 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -239,6 +239,7 @@ if(ICEBERG_BUILD_BUNDLE)
avro/avro_schema_util.cc
avro/avro_stream_internal.cc
parquet/parquet_data_util.cc
+ parquet/parquet_metrics.cc
parquet/parquet_reader.cc
parquet/parquet_register.cc
parquet/parquet_schema_util.cc
diff --git a/src/iceberg/file_writer.h b/src/iceberg/file_writer.h
index a49b5228..3c890453 100644
--- a/src/iceberg/file_writer.h
+++ b/src/iceberg/file_writer.h
@@ -30,6 +30,7 @@
#include "iceberg/arrow_c_data.h"
#include "iceberg/file_format.h"
#include "iceberg/metrics.h"
+#include "iceberg/metrics_config.h"
#include "iceberg/result.h"
#include "iceberg/type_fwd.h"
#include "iceberg/util/config.h"
@@ -77,6 +78,8 @@ struct ICEBERG_EXPORT WriterOptions {
std::shared_ptr<class FileIO> io;
/// \brief Metadata to write to the file.
std::unordered_map<std::string, std::string> metadata;
+ /// \brief Metrics configuration.
+ std::shared_ptr<MetricsConfig> metrics_config = MetricsConfig::Default();
/// \brief Format-specific or implementation-specific properties.
WriterProperties properties;
};
diff --git a/src/iceberg/metrics.h b/src/iceberg/metrics.h
index b476a475..083cd041 100644
--- a/src/iceberg/metrics.h
+++ b/src/iceberg/metrics.h
@@ -30,6 +30,35 @@
namespace iceberg {
+/// \brief Field-level metrics for a single column.
+///
+/// This structure captures value counts, null counts, NaN counts, and optional
+/// lower/upper bounds for a specific field identified by its field_id.
+struct ICEBERG_EXPORT FieldMetrics {
+ /// \brief The field ID this metrics belongs to.
+ int32_t field_id;
+
+ /// \brief The total number of values (including nulls) for this field.
+ /// A negative value indicates the count is unknown.
+ int64_t value_count = -1;
+
+ /// \brief The number of null values for this field.
+ /// A negative value indicates the count is unknown.
+ int64_t null_value_count = -1;
+
+ /// \brief The number of NaN values for this field.
+ /// A negative value indicates the count is unknown.
+ int64_t nan_value_count = -1;
+
+ /// \brief The lower bound value as a Literal.
+ /// Empty if no lower bound is available.
+ std::optional<Literal> lower_bound = std::nullopt;
+
+ /// \brief The upper bound value as a Literal.
+ /// Empty if no upper bound is available.
+ std::optional<Literal> upper_bound = std::nullopt;
+};
+
/// \brief Iceberg file format metrics
struct ICEBERG_EXPORT Metrics {
std::optional<int64_t> row_count;
diff --git a/src/iceberg/metrics_config.cc b/src/iceberg/metrics_config.cc
index e378640e..ea20d47e 100644
--- a/src/iceberg/metrics_config.cc
+++ b/src/iceberg/metrics_config.cc
@@ -19,6 +19,7 @@
#include "iceberg/metrics_config.h"
+#include <limits>
#include <string>
#include <unordered_map>
@@ -100,6 +101,19 @@ Result<MetricsMode>
MetricsMode::FromString(std::string_view mode) {
return InvalidArgument("Invalid metrics mode: {}", mode);
}
+int32_t MetricsMode::TruncateLength() const {
+ switch (kind) {
+ case Kind::kNone:
+ case Kind::kCounts:
+ return 0;
+ case Kind::kTruncate:
+ return std::get<int32_t>(length);
+ case Kind::kFull:
+ return std::numeric_limits<int32_t>::max();
+ }
+ return 0;
+}
+
MetricsConfig::MetricsConfig(ColumnModeMap column_modes, MetricsMode
default_mode)
: column_modes_(std::move(column_modes)), default_mode_(default_mode) {}
@@ -116,6 +130,14 @@ Result<std::shared_ptr<MetricsConfig>>
MetricsConfig::Make(const Table& table) {
*sort_order.value_or(SortOrder::Unsorted()));
}
+Result<std::shared_ptr<MetricsConfig>> MetricsConfig::Make(
+ std::unordered_map<std::string, std::string> properties) {
+ // Create a minimal TableProperties wrapper for the properties
+ TableProperties props = TableProperties::FromMap(std::move(properties));
+
+ return MakeInternal(props, Schema({}), *SortOrder::Unsorted());
+}
+
Result<std::shared_ptr<MetricsConfig>> MetricsConfig::MakeInternal(
const TableProperties& props, const Schema& schema, const SortOrder&
order) {
ColumnModeMap column_modes;
diff --git a/src/iceberg/metrics_config.h b/src/iceberg/metrics_config.h
index 7a49e906..a5e51ee6 100644
--- a/src/iceberg/metrics_config.h
+++ b/src/iceberg/metrics_config.h
@@ -52,6 +52,11 @@ struct ICEBERG_EXPORT MetricsMode {
Kind kind;
std::variant<std::monostate, int32_t> length;
+
+ /// \brief Get the truncate length from this MetricsMode.
+ /// \return 0 for None/Counts modes, the truncate length for Truncate mode,
+ /// or INT_MAX for Full mode.
+ int32_t TruncateLength() const;
};
/// \brief Configuration for collecting column metrics for an Iceberg table.
@@ -63,6 +68,12 @@ class ICEBERG_EXPORT MetricsConfig {
/// \brief Creates a metrics config from a table.
static Result<std::shared_ptr<MetricsConfig>> Make(const Table& table);
+ /// \brief Creates a metrics config from properties (for testing)
+ /// \param properties Map of property key-value pairs
+ /// \return A shared pointer to the created MetricsConfig
+ static Result<std::shared_ptr<MetricsConfig>> Make(
+ std::unordered_map<std::string, std::string> properties);
+
/// \brief Get `limit` num of primitive field ids from schema
static Result<std::unordered_set<int32_t>> LimitFieldIds(const Schema&
schema,
int32_t limit);
diff --git a/src/iceberg/parquet/parquet_metrics.cc
b/src/iceberg/parquet/parquet_metrics.cc
new file mode 100644
index 00000000..09b727a1
--- /dev/null
+++ b/src/iceberg/parquet/parquet_metrics.cc
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/parquet/parquet_metrics.h"
+
+#include <limits>
+#include <optional>
+#include <ranges>
+#include <string>
+#include <unordered_map>
+
+#include <parquet/column_reader.h>
+#include <parquet/schema.h>
+#include <parquet/statistics.h>
+#include <parquet/types.h>
+
+#include "iceberg/expression/literal.h"
+#include "iceberg/result.h"
+#include "iceberg/schema.h"
+#include "iceberg/type.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/conversions.h"
+#include "iceberg/util/truncate_util.h"
+#include "iceberg/util/visit_type.h"
+
+namespace iceberg::parquet {
+
+namespace {
+
+/// \brief Get the Iceberg field ID from a Parquet column descriptor.
+/// \return The field ID, or nullopt if no field ID is set.
+std::optional<int32_t> GetFieldId(const ::parquet::ColumnDescriptor& column) {
+ const auto& node = column.schema_node();
+ if (node == nullptr || !node->is_primitive()) {
+ return std::nullopt;
+ }
+ if (node->field_id() < 0) {
+ return std::nullopt;
+ }
+ return node->field_id();
+}
+
+/// \brief Find the column index for a field in the Parquet schema.
+std::optional<int32_t> FindColumnIndex(const ::parquet::SchemaDescriptor&
parquet_schema,
+ int32_t field_id) {
+ auto columns = std::views::iota(0, parquet_schema.num_columns());
+ auto it = std::ranges::find_if(columns, [&](int i) {
+ auto column_field_id = GetFieldId(*parquet_schema.Column(i));
+ return column_field_id.has_value() && column_field_id.value() == field_id;
+ });
+ return it != columns.end() ? std::optional(*it) : std::nullopt;
+}
+
+/// \brief Collect counts (value count and null count) from footer statistics.
+/// \param field_id The Iceberg field ID.
+/// \param metadata The Parquet file metadata.
+/// \param column_idx The column index in the Parquet schema.
+/// \return A pair of (value_count, null_count), or nullopt if stats are not
available.
+std::optional<FieldMetrics> CollectCounts(int32_t field_id,
+ const ::parquet::FileMetaData&
metadata,
+ int32_t column_idx) {
+ int64_t value_count = 0;
+ int64_t null_count = 0;
+
+ for (int rg = 0; rg < metadata.num_row_groups(); ++rg) {
+ auto row_group = metadata.RowGroup(rg);
+ auto column_chunk = row_group->ColumnChunk(column_idx);
+ auto stats = column_chunk->statistics();
+ if (stats == nullptr || !stats->HasNullCount()) {
+ return std::nullopt;
+ }
+
+ null_count += stats->null_count();
+ value_count += column_chunk->num_values();
+ }
+
+ return FieldMetrics{
+ .field_id = field_id, .value_count = value_count, .null_value_count =
null_count};
+}
+
+/// \brief Collect bounds (lower and upper) from footer statistics.
+/// \param field_id The Iceberg field ID.
+/// \param iceberg_type The Iceberg primitive type for deserializing values.
+/// \param metadata The Parquet file metadata.
+/// \param column_idx The column index in the Parquet schema.
+/// \param truncate_length The length to truncate strings/binary values.
+/// \return FieldMetrics with counts and bounds, or nullopt if stats are not
available.
+Result<std::optional<FieldMetrics>> CollectBounds(
+ int32_t field_id, std::shared_ptr<PrimitiveType> iceberg_type,
+ const ::parquet::FileMetaData& metadata, int32_t column_idx,
+ int32_t truncate_length) {
+ int64_t null_count = 0;
+ int64_t value_count = 0;
+ std::optional<Literal> lower_bound;
+ std::optional<Literal> upper_bound;
+
+ for (int32_t rg = 0; rg < metadata.num_row_groups(); ++rg) {
+ auto row_group = metadata.RowGroup(rg);
+ auto column_chunk = row_group->ColumnChunk(column_idx);
+ auto stats = column_chunk->statistics();
+ if (stats == nullptr || !stats->HasNullCount()) {
+ return std::nullopt;
+ }
+
+ null_count += stats->null_count();
+ value_count += column_chunk->num_values();
+
+ if (stats->HasMinMax()) {
+ auto min_bytes = stats->EncodeMin();
+ auto min_span = std::span<const uint8_t>(
+ reinterpret_cast<const uint8_t*>(min_bytes.data()),
min_bytes.size());
+ ICEBERG_ASSIGN_OR_RAISE(auto min_value,
+ Conversions::FromBytes(iceberg_type, min_span));
+ if (!lower_bound.has_value() || min_value < lower_bound.value()) {
+ lower_bound = std::move(min_value);
+ }
+
+ auto max_bytes = stats->EncodeMax();
+ auto max_span = std::span<const uint8_t>(
+ reinterpret_cast<const uint8_t*>(max_bytes.data()),
max_bytes.size());
+ ICEBERG_ASSIGN_OR_RAISE(auto max_value,
+ Conversions::FromBytes(iceberg_type, max_span));
+ if (!upper_bound.has_value() || max_value > upper_bound.value()) {
+ upper_bound = std::move(max_value);
+ }
+ }
+ }
+
+ if (!lower_bound.has_value() || !upper_bound.has_value() ||
lower_bound->IsNaN() ||
+ upper_bound->IsNaN()) {
+ return FieldMetrics{
+ .field_id = field_id,
+ .value_count = value_count,
+ .null_value_count = null_count,
+ };
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(auto truncated_lower,
+ TruncateUtils::TruncateLowerBound(
+ *iceberg_type, lower_bound.value(),
truncate_length));
+ ICEBERG_ASSIGN_OR_RAISE(auto truncated_upper,
+ TruncateUtils::TruncateUpperBound(
+ *iceberg_type, upper_bound.value(),
truncate_length));
+
+ return FieldMetrics{
+ .field_id = field_id,
+ .value_count = value_count,
+ .null_value_count = null_count,
+ .lower_bound = std::move(truncated_lower),
+ .upper_bound = std::move(truncated_upper),
+ };
+}
+
+/// \brief Process pre-computed field metrics, applying truncation if needed.
+/// \param field_id The field ID to look up.
+/// \param field_metrics The map of pre-computed field metrics.
+/// \param primitive_type The primitive type for truncation.
+/// \param truncate_length The truncation length (0 means no bounds).
+/// \return Processed FieldMetrics with truncated bounds if applicable.
+Result<std::optional<FieldMetrics>> MetricsFromFieldMetrics(
+ int32_t field_id, const std::unordered_map<int32_t, FieldMetrics>&
field_metrics,
+ std::shared_ptr<PrimitiveType> primitive_type, int32_t truncate_length) {
+ auto it = field_metrics.find(field_id);
+ if (it == field_metrics.end()) {
+ return std::nullopt;
+ }
+
+ const auto& fm = it->second;
+ FieldMetrics result{.field_id = fm.field_id,
+ .value_count = fm.value_count,
+ .null_value_count = fm.null_value_count,
+ .nan_value_count = fm.nan_value_count};
+
+ if (truncate_length > 0) {
+ if (fm.lower_bound.has_value()) {
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto lower, TruncateUtils::TruncateLowerBound(
+ *primitive_type, fm.lower_bound.value(),
truncate_length));
+ result.lower_bound = std::move(lower);
+ }
+ if (fm.upper_bound.has_value()) {
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto upper, TruncateUtils::TruncateUpperBound(
+ *primitive_type, fm.upper_bound.value(),
truncate_length));
+ result.upper_bound = std::move(upper);
+ }
+ }
+
+ return result;
+}
+
+/// \brief Collect metrics for a single primitive field from footer statistics.
+Result<std::optional<FieldMetrics>> MetricsFromFooter(
+ int32_t field_id, std::shared_ptr<PrimitiveType> iceberg_type,
+ const ::parquet::SchemaDescriptor& parquet_schema,
+ const ::parquet::FileMetaData& metadata, int32_t truncate_length) {
+ auto column_idx = FindColumnIndex(parquet_schema, field_id);
+ if (!column_idx.has_value()) {
+ return std::nullopt;
+ }
+
+ auto column_desc = parquet_schema.Column(column_idx.value());
+ if (column_desc->physical_type() == ::parquet::Type::INT96) {
+ return std::nullopt;
+ }
+
+ if (truncate_length <= 0) {
+ return CollectCounts(field_id, metadata, column_idx.value());
+ }
+
+ return CollectBounds(field_id, iceberg_type, metadata, column_idx.value(),
+ truncate_length);
+}
+
+/// \brief Visitor for collecting metrics from all primitive fields in a
schema.
+class CollectMetricsVisitor {
+ public:
+ CollectMetricsVisitor(const ::parquet::SchemaDescriptor& parquet_schema,
+ const MetricsConfig& metrics_config,
+ const ::parquet::FileMetaData& metadata,
+ const std::unordered_map<int32_t, FieldMetrics>&
field_metrics,
+ Metrics& metrics)
+ : parquet_schema_(parquet_schema),
+ metrics_config_(metrics_config),
+ metadata_(metadata),
+ field_metrics_(field_metrics),
+ metrics_(metrics) {}
+
+ Status VisitStruct(const StructType& type, const std::string& prefix) {
+ for (const auto& field : type.fields()) {
+ std::string full_name = prefix.empty() ? std::string(field.name())
+ : prefix + "." +
std::string(field.name());
+ ICEBERG_RETURN_UNEXPECTED(VisitField(field, full_name));
+ }
+ return {};
+ }
+
+ Status VisitList(const ListType& /*type*/, const std::string& /*prefix*/) {
return {}; }
+
+ Status VisitMap(const MapType& /*type*/, const std::string& /*prefix*/) {
return {}; }
+
+ Status VisitPrimitive(const PrimitiveType& /*type*/, const std::string&
/*prefix*/) {
+ return {};
+ }
+
+ private:
+ Status VisitField(const SchemaField& field, const std::string& full_name) {
+ if (field.type()->is_primitive()) {
+ return ProcessPrimitiveField(field, full_name);
+ } else if (field.type()->is_nested()) {
+ return VisitTypeCategory(*field.type(), this, full_name);
+ }
+ return {};
+ }
+
+ Status ProcessPrimitiveField(const SchemaField& field, const std::string&
full_name) {
+ int32_t field_id = field.field_id();
+ MetricsMode mode = metrics_config_.ColumnMode(full_name);
+ if (mode.kind == MetricsMode::Kind::kNone) {
+ return {};
+ }
+
+ int32_t truncate_length = mode.TruncateLength();
+ const auto& primitive_type =
+ internal::checked_pointer_cast<PrimitiveType>(field.type());
+
+ ICEBERG_ASSIGN_OR_RAISE(auto field_metrics,
+ MetricsFromFieldMetrics(field_id, field_metrics_,
+ primitive_type,
truncate_length));
+ if (field_metrics.has_value()) {
+ ApplyFieldMetrics(field_id, std::move(field_metrics.value()));
+ return {};
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(auto footer_metrics,
+ MetricsFromFooter(field_id, primitive_type,
parquet_schema_,
+ metadata_, truncate_length));
+ if (footer_metrics.has_value()) {
+ ApplyFieldMetrics(field_id, std::move(footer_metrics.value()));
+ }
+ return {};
+ }
+
+ void ApplyFieldMetrics(int32_t field_id, FieldMetrics&& fm) {
+ if (fm.value_count >= 0) {
+ metrics_.value_counts[field_id] = fm.value_count;
+ }
+ if (fm.null_value_count >= 0) {
+ metrics_.null_value_counts[field_id] = fm.null_value_count;
+ }
+ if (fm.nan_value_count >= 0) {
+ metrics_.nan_value_counts[field_id] = fm.nan_value_count;
+ }
+ if (fm.lower_bound.has_value()) {
+ metrics_.lower_bounds.emplace(field_id,
std::move(fm.lower_bound.value()));
+ }
+ if (fm.upper_bound.has_value()) {
+ metrics_.upper_bounds.emplace(field_id,
std::move(fm.upper_bound.value()));
+ }
+ }
+
+ const ::parquet::SchemaDescriptor& parquet_schema_;
+ const MetricsConfig& metrics_config_;
+ const ::parquet::FileMetaData& metadata_;
+ const std::unordered_map<int32_t, FieldMetrics>& field_metrics_;
+ Metrics& metrics_;
+};
+
+} // namespace
+
+Result<Metrics> ParquetMetrics::GetMetrics(
+ const Schema& schema, const ::parquet::SchemaDescriptor& parquet_schema,
+ const MetricsConfig& metrics_config, const ::parquet::FileMetaData&
metadata,
+ const std::unordered_map<int32_t, FieldMetrics>& field_metrics) {
+ Metrics metrics;
+
+ // Collect row count and column sizes
+ int64_t row_count = 0;
+ for (int rg = 0; rg < metadata.num_row_groups(); ++rg) {
+ auto row_group = metadata.RowGroup(rg);
+ row_count += row_group->num_rows();
+ for (int col = 0; col < row_group->num_columns(); ++col) {
+ auto column_chunk = row_group->ColumnChunk(col);
+ auto field_id_opt = GetFieldId(*parquet_schema.Column(col));
+ if (!field_id_opt.has_value()) {
+ continue;
+ }
+ int32_t field_id = field_id_opt.value();
+
+ ICEBERG_ASSIGN_OR_RAISE(auto field_name,
schema.FindColumnNameById(field_id));
+ if (!field_name.has_value()) {
+ continue;
+ }
+
+ MetricsMode mode = metrics_config.ColumnMode(field_name.value());
+ if (mode.kind != MetricsMode::Kind::kNone) {
+ metrics.column_sizes[field_id] =
+ metrics.column_sizes.contains(field_id)
+ ? metrics.column_sizes[field_id] +
column_chunk->total_compressed_size()
+ : column_chunk->total_compressed_size();
+ }
+ }
+ }
+ metrics.row_count = row_count;
+
+ // Collect metrics for all primitive fields
+ CollectMetricsVisitor visitor(parquet_schema, metrics_config, metadata,
field_metrics,
+ metrics);
+ ICEBERG_RETURN_UNEXPECTED(visitor.VisitStruct(schema, ""));
+
+ return metrics;
+}
+
+} // namespace iceberg::parquet
diff --git a/src/iceberg/parquet/parquet_metrics.h
b/src/iceberg/parquet/parquet_metrics.h
new file mode 100644
index 00000000..eb916241
--- /dev/null
+++ b/src/iceberg/parquet/parquet_metrics.h
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/parquet/parquet_metrics.h
+/// \brief Utilities for extracting metrics from Parquet files.
+
+#include <unordered_map>
+
+#include <parquet/metadata.h>
+
+#include "iceberg/iceberg_bundle_export.h"
+#include "iceberg/metrics.h"
+#include "iceberg/metrics_config.h"
+#include "iceberg/result.h"
+#include "iceberg/schema.h"
+
+namespace iceberg::parquet {
+
+/// \brief Utility class for computing metrics from Parquet files.
+class ICEBERG_BUNDLE_EXPORT ParquetMetrics {
+ public:
+ ParquetMetrics() = delete;
+
+ /// \brief Compute file-level metrics from Parquet file metadata.
+ ///
+ /// This function extracts metrics including row count, column sizes, value
counts,
+ /// null value counts, and lower/upper bounds from Parquet file metadata.
+ /// NaN value counts are not currently collected from Parquet metadata.
+ /// The metrics are computed according to the provided MetricsConfig, which
determines
+ /// which columns to collect metrics for and at what granularity (counts
only, truncated
+ /// bounds, or full bounds).
+ ///
+ /// \param schema The Iceberg schema for the table.
+ /// \param parquet_schema The Parquet schema descriptor.
+ /// \param metrics_config The configuration specifying how to collect
metrics.
+ /// \param metadata The Parquet file metadata containing row group
statistics.
+ /// \param field_metrics Optional per-field metrics computed during write.
+ /// If provided, these take precedence over footer statistics.
+ /// \return Result containing the computed Metrics or an error.
+ static Result<Metrics> GetMetrics(
+ const Schema& schema, const ::parquet::SchemaDescriptor& parquet_schema,
+ const MetricsConfig& metrics_config, const ::parquet::FileMetaData&
metadata,
+ const std::unordered_map<int32_t, FieldMetrics>& field_metrics = {});
+};
+
+} // namespace iceberg::parquet
diff --git a/src/iceberg/parquet/parquet_writer.cc
b/src/iceberg/parquet/parquet_writer.cc
index c70d3310..e91a2a6c 100644
--- a/src/iceberg/parquet/parquet_writer.cc
+++ b/src/iceberg/parquet/parquet_writer.cc
@@ -33,6 +33,7 @@
#include "iceberg/arrow/arrow_io_internal.h"
#include "iceberg/arrow/arrow_status_internal.h"
+#include "iceberg/parquet/parquet_metrics.h"
#include "iceberg/schema_internal.h"
#include "iceberg/util/macros.h"
@@ -86,6 +87,8 @@ Result<std::optional<int32_t>> ParseCodecLevel(const
WriterProperties& propertie
class ParquetWriter::Impl {
public:
Status Open(const WriterOptions& options) {
+ schema_ = options.schema;
+
ICEBERG_ASSIGN_OR_RAISE(auto compression,
ParseCompression(options.properties));
ICEBERG_ASSIGN_OR_RAISE(auto compression_level,
ParseCodecLevel(options.properties));
@@ -98,15 +101,14 @@ class ParquetWriter::Impl {
auto arrow_writer_properties =
::parquet::default_arrow_writer_properties();
ArrowSchema c_schema;
- ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*options.schema, &c_schema));
+ ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*schema_, &c_schema));
ICEBERG_ARROW_ASSIGN_OR_RETURN(arrow_schema_,
::arrow::ImportSchema(&c_schema));
- std::shared_ptr<::parquet::SchemaDescriptor> schema_descriptor;
ICEBERG_ARROW_RETURN_NOT_OK(
::parquet::arrow::ToParquetSchema(arrow_schema_.get(),
*writer_properties,
- *arrow_writer_properties,
&schema_descriptor));
+ *arrow_writer_properties,
&parquet_schema_));
auto schema_node = std::static_pointer_cast<::parquet::schema::GroupNode>(
- schema_descriptor->schema_root());
+ parquet_schema_->schema_root());
ICEBERG_RETURN_UNEXPECTED(CheckCompressionAvailable(
options.properties.Get(WriterProperties::kParquetCompression),
compression));
@@ -119,6 +121,8 @@ class ParquetWriter::Impl {
::parquet::arrow::FileWriter::Make(pool_, std::move(file_writer),
arrow_schema_,
std::move(arrow_writer_properties),
&writer_));
+ metrics_config_ = options.metrics_config;
+
return {};
}
@@ -143,6 +147,7 @@ class ParquetWriter::Impl {
for (int i = 0; i < metadata->num_row_groups(); ++i) {
split_offsets_.push_back(metadata->RowGroup(i)->file_offset());
}
+ metadata_ = writer_->metadata();
writer_.reset();
ICEBERG_ARROW_ASSIGN_OR_RETURN(total_bytes_, output_stream_->Tell());
@@ -165,15 +170,35 @@ class ParquetWriter::Impl {
std::vector<int64_t> split_offsets() const { return split_offsets_; }
+ Result<Metrics> metrics() {
+ if (writer_) {
+ return Invalid("Cannot return metrics for unclosed writer");
+ }
+ if (!metadata_) {
+ return Metrics();
+ }
+ // TODO(WZhuo): collect write-side FieldMetrics to support NaN value
counts.
+ return ParquetMetrics::GetMetrics(*schema_, *parquet_schema_,
*metrics_config_,
+ *metadata_, {});
+ }
+
private:
// TODO(gangwu): make memory pool configurable
::arrow::MemoryPool* pool_ = ::arrow::default_memory_pool();
+ // Schema to write from the Iceberg table.
+ std::shared_ptr<Schema> schema_;
// Schema to write from the Parquet file.
std::shared_ptr<::arrow::Schema> arrow_schema_;
+ // Parquet schema descriptor generated from the Arrow schema.
+ std::shared_ptr<::parquet::SchemaDescriptor> parquet_schema_;
+ // Metrics config for collecting metrics during write.
+ std::shared_ptr<MetricsConfig> metrics_config_;
// The output stream to write Parquet file.
std::shared_ptr<::arrow::io::OutputStream> output_stream_;
// Parquet file writer to write ArrowArray.
std::unique_ptr<::parquet::arrow::FileWriter> writer_;
+ // Store the metadata if writer has been closed.
+ std::shared_ptr<::parquet::FileMetaData> metadata_;
// Total length of the written Parquet file.
int64_t total_bytes_{0};
// Row group start offsets in the Parquet file.
@@ -191,12 +216,7 @@ Status ParquetWriter::Write(ArrowArray* array) { return
impl_->Write(array); }
Status ParquetWriter::Close() { return impl_->Close(); }
-Result<Metrics> ParquetWriter::metrics() {
- if (!impl_->Closed()) {
- return Invalid("ParquetWriter is not closed");
- }
- return {};
-}
+Result<Metrics> ParquetWriter::metrics() { return impl_->metrics(); }
Result<int64_t> ParquetWriter::length() { return impl_->length(); }
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index 7b546267..b415154d 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -192,7 +192,9 @@ if(ICEBERG_BUILD_BUNDLE)
add_iceberg_test(parquet_test
USE_BUNDLE
SOURCES
+ metrics_test_base.cc
parquet_data_test.cc
+ parquet_metrics_test.cc
parquet_schema_test.cc
parquet_test.cc)
diff --git a/src/iceberg/test/metrics_test_base.cc
b/src/iceberg/test/metrics_test_base.cc
new file mode 100644
index 00000000..cc5f7cd6
--- /dev/null
+++ b/src/iceberg/test/metrics_test_base.cc
@@ -0,0 +1,1020 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/test/metrics_test_base.h"
+
+#include <arrow/builder.h>
+#include <arrow/c/bridge.h>
+#include <arrow/json/from_string.h>
+
+#include "iceberg/arrow/arrow_io_internal.h"
+#include "iceberg/arrow/arrow_status_internal.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/type.h"
+#include "iceberg/util/decimal.h"
+
+namespace iceberg::test {
+
+void MetricsTestBase::SetUp() {
+ file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO();
+ temp_dir_ = "metrics_test";
+}
+
+void MetricsTestBase::AssertCounts(int field_id,
+ std::optional<int64_t> expected_value_count,
+ std::optional<int64_t> expected_null_count,
+ const Metrics& metrics) {
+ if (expected_value_count.has_value()) {
+ ASSERT_TRUE(metrics.value_counts.contains(field_id))
+ << "Field " << field_id << " should have value count";
+ EXPECT_EQ(metrics.value_counts.at(field_id), expected_value_count.value())
+ << "Field " << field_id << " value count mismatch";
+ } else {
+ EXPECT_FALSE(metrics.value_counts.contains(field_id))
+ << "Field " << field_id << " should not have value count";
+ }
+
+ if (expected_null_count.has_value()) {
+ ASSERT_TRUE(metrics.null_value_counts.contains(field_id))
+ << "Field " << field_id << " should have null count";
+ EXPECT_EQ(metrics.null_value_counts.at(field_id),
expected_null_count.value())
+ << "Field " << field_id << " null count mismatch";
+ } else {
+ EXPECT_FALSE(metrics.null_value_counts.contains(field_id))
+ << "Field " << field_id << " should not have null count";
+ }
+}
+
+void MetricsTestBase::AssertCounts(int field_id,
+ std::optional<int64_t> expected_value_count,
+ std::optional<int64_t> expected_null_count,
+ std::optional<int64_t> expected_nan_count,
+ const Metrics& metrics) {
+ AssertCounts(field_id, expected_value_count, expected_null_count, metrics);
+
+ if (expected_nan_count.has_value()) {
+ ASSERT_TRUE(metrics.nan_value_counts.contains(field_id))
+ << "Field " << field_id << " should have NaN count";
+ EXPECT_EQ(metrics.nan_value_counts.at(field_id),
expected_nan_count.value())
+ << "Field " << field_id << " NaN count mismatch";
+ } else {
+ EXPECT_FALSE(metrics.nan_value_counts.contains(field_id))
+ << "Field " << field_id << " should not have NaN count";
+ }
+}
+
+template <typename T>
+void MetricsTestBase::AssertBounds(int field_id,
std::shared_ptr<PrimitiveType> type,
+ std::optional<T> expected_lower,
+ std::optional<T> expected_upper,
+ const Metrics& metrics) {
+ if (expected_lower.has_value()) {
+ ASSERT_TRUE(metrics.lower_bounds.contains(field_id))
+ << "Field " << field_id << " should have lower bound";
+ const auto& literal = metrics.lower_bounds.at(field_id);
+ ASSERT_FALSE(literal.IsNull())
+ << "Field " << field_id << " lower bound literal should not be null";
+ EXPECT_EQ(std::get<T>(literal.value()), expected_lower.value())
+ << "Field " << field_id << " lower bound mismatch";
+ } else {
+ EXPECT_FALSE(metrics.lower_bounds.contains(field_id));
+ }
+
+ if (expected_upper.has_value()) {
+ ASSERT_TRUE(metrics.upper_bounds.contains(field_id))
+ << "Field " << field_id << " should have upper bound";
+ const auto& literal = metrics.upper_bounds.at(field_id);
+ ASSERT_FALSE(literal.IsNull())
+ << "Field " << field_id << " upper bound literal should not be null";
+ EXPECT_EQ(std::get<T>(literal.value()), expected_upper.value())
+ << "Field " << field_id << " upper bound mismatch";
+ } else {
+ EXPECT_FALSE(metrics.upper_bounds.contains(field_id));
+ }
+}
+
+// Explicit template instantiations for common types
+template void MetricsTestBase::AssertBounds<bool>(int,
std::shared_ptr<PrimitiveType>,
+ std::optional<bool>,
+ std::optional<bool>, const
Metrics&);
+template void MetricsTestBase::AssertBounds<int32_t>(int,
std::shared_ptr<PrimitiveType>,
+ std::optional<int32_t>,
+ std::optional<int32_t>,
+ const Metrics&);
+template void MetricsTestBase::AssertBounds<int64_t>(int,
std::shared_ptr<PrimitiveType>,
+ std::optional<int64_t>,
+ std::optional<int64_t>,
+ const Metrics&);
+template void MetricsTestBase::AssertBounds<float>(int,
std::shared_ptr<PrimitiveType>,
+ std::optional<float>,
+ std::optional<float>, const
Metrics&);
+template void MetricsTestBase::AssertBounds<double>(int,
std::shared_ptr<PrimitiveType>,
+ std::optional<double>,
+ std::optional<double>,
+ const Metrics&);
+template void MetricsTestBase::AssertBounds<std::string>(int,
+
std::shared_ptr<PrimitiveType>,
+
std::optional<std::string>,
+
std::optional<std::string>,
+ const Metrics&);
+template void MetricsTestBase::AssertBounds<std::vector<uint8_t>>(
+ int, std::shared_ptr<PrimitiveType>, std::optional<std::vector<uint8_t>>,
+ std::optional<std::vector<uint8_t>>, const Metrics&);
+
+template void MetricsTestBase::AssertBounds<Decimal>(int,
std::shared_ptr<PrimitiveType>,
+ std::optional<Decimal>,
+ std::optional<Decimal>,
+ const Metrics&);
+
+std::shared_ptr<::arrow::Array> MetricsTestBase::CreateRecordArrays(
+ const std::shared_ptr<::arrow::Schema>& arrow_schema, const std::string&
json_data) {
+ auto struct_type = ::arrow::struct_(arrow_schema->fields());
+ return ::arrow::json::ArrayFromJSONString(struct_type,
json_data).ValueOrDie();
+}
+
+std::shared_ptr<Schema> MetricsTestBase::SimpleSchema() {
+ return std::make_shared<Schema>(std::vector<SchemaField>{
+ SchemaField::MakeOptional(1, "booleanCol", boolean()),
+ SchemaField::MakeRequired(2, "intCol", int32()),
+ SchemaField::MakeOptional(3, "longCol", int64()),
+ SchemaField::MakeRequired(4, "floatCol", float32()),
+ SchemaField::MakeOptional(5, "doubleCol", float64()),
+ SchemaField::MakeOptional(6, "decimalCol", decimal(10, 2)),
+ SchemaField::MakeRequired(7, "stringCol", string()),
+ SchemaField::MakeOptional(8, "dateCol", date()),
+ SchemaField::MakeRequired(9, "timeCol", time()),
+ SchemaField::MakeRequired(10, "timestampColAboveEpoch", timestamp()),
+ SchemaField::MakeRequired(11, "fixedCol", fixed(4)),
+ SchemaField::MakeRequired(12, "binaryCol", binary()),
+ SchemaField::MakeRequired(13, "timestampColBelowEpoch", timestamp()),
+ });
+}
+
+std::shared_ptr<Schema> MetricsTestBase::NestedSchema() {
+ auto leaf_struct = struct_({
+ SchemaField::MakeOptional(5, "leafLongCol", int64()),
+ SchemaField::MakeOptional(6, "leafBinaryCol", binary()),
+ });
+
+ auto nested_struct = struct_({
+ SchemaField::MakeRequired(3, "longCol", int64()),
+ SchemaField::MakeRequired(4, "leafStructCol", leaf_struct),
+ SchemaField::MakeRequired(7, "doubleCol", float64()),
+ });
+
+ return std::make_shared<Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "intCol", int32()),
+ SchemaField::MakeRequired(2, "nestedStructCol", nested_struct),
+ });
+}
+
+std::shared_ptr<Schema> MetricsTestBase::FloatDoubleSchema() {
+ return std::make_shared<Schema>(std::vector<SchemaField>{
+ SchemaField::MakeOptional(1, "floatCol", float32()),
+ SchemaField::MakeOptional(2, "doubleCol", float64()),
+ });
+}
+
+Result<std::shared_ptr<::arrow::Schema>> ToArrowSchema(std::shared_ptr<Schema>
schema) {
+ ArrowSchema c_schema;
+ ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*schema, &c_schema));
+ std::shared_ptr<::arrow::Schema> arrow_schema;
+ ICEBERG_ARROW_ASSIGN_OR_RETURN(arrow_schema,
::arrow::ImportSchema(&c_schema));
+ return arrow_schema;
+}
+
+void MetricsTestBase::MetricsForRepeatedValues() {
+ auto schema = SimpleSchema();
+ ICEBERG_UNWRAP_OR_FAIL(auto arrow_schema, ToArrowSchema(schema));
+ ICEBERG_UNWRAP_OR_FAIL(auto records, BuildSimpleRecords(arrow_schema, 2));
+ ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, records));
+
+ ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
+ EXPECT_EQ(*metrics.row_count, 2);
+
+ AssertCounts(1, 2, 0, metrics);
+ AssertCounts(2, 2, 0, metrics);
+ AssertCounts(3, 2, 1, metrics);
+ // TODO(WZhuo) Assert NaN metrics
+ AssertCounts(4, 2, 0, metrics); // floatCol has 2 NaN values
+ AssertCounts(5, 2, 0, metrics);
+ AssertCounts(6, 2, 1, metrics);
+ AssertCounts(7, 2, 0, metrics);
+ AssertCounts(8, 2, 0, metrics);
+ AssertCounts(9, 2, 0, metrics);
+ AssertCounts(10, 2, 0, metrics);
+ AssertCounts(11, 2, 0, metrics);
+ AssertCounts(12, 2, 0, metrics);
+ AssertCounts(13, 2, 0, metrics);
+}
+
+void MetricsTestBase::MetricsForTopLevelFields() {
+ auto schema = SimpleSchema();
+ ICEBERG_UNWRAP_OR_FAIL(auto arrow_schema, ToArrowSchema(schema));
+
+ auto records = CreateRecordArrays(arrow_schema, R"([
+ {"booleanCol": true, "intCol": 3, "longCol": 5, "floatCol": 2.0,
"doubleCol": 2.0,
+ "decimalCol": "3.50", "stringCol": "AAA", "dateCol": 1500, "timeCol":
2000,
+ "timestampColAboveEpoch": 0, "fixedCol": "abcd", "binaryCol": "S",
"timestampColBelowEpoch": -1900300},
+ {"booleanCol": false, "intCol": -2147483648, "longCol": null, "floatCol":
1.0, "doubleCol": null,
+ "decimalCol": null, "stringCol": "ZZZ", "dateCol": null, "timeCol": 3000,
+ "timestampColAboveEpoch": 900, "fixedCol": "abcd", "binaryCol": "W",
"timestampColBelowEpoch": -7000}
+ ])");
+
+ ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, records));
+
+ ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
+ EXPECT_EQ(*metrics.row_count, 2);
+
+ AssertCounts(1, 2, 0, metrics);
+ AssertBounds<bool>(1, boolean(), false, true, metrics);
+ AssertCounts(2, 2, 0, metrics);
+ AssertBounds<int32_t>(2, int32(), std::numeric_limits<int32_t>::min(), 3,
metrics);
+ AssertCounts(3, 2, 1, metrics);
+ AssertBounds<int64_t>(3, int64(), 5, 5, metrics);
+ AssertCounts(4, 2, 0, metrics);
+ AssertBounds<float>(4, float32(), 1.0F, 2.0F, metrics);
+ AssertCounts(5, 2, 1, metrics);
+ AssertBounds<double>(5, float64(), 2.0, 2.0, metrics);
+ AssertCounts(6, 2L, 1L, metrics);
+ AssertBounds<Decimal>(6, std::make_shared<DecimalType>(10, 2), Decimal(350),
+ Decimal(350), metrics);
+ AssertCounts(7, 2, 0, metrics);
+ AssertBounds<std::string>(7, string(), std::string("AAA"),
std::string("ZZZ"), metrics);
+
+ AssertCounts(8, 2, 1, metrics);
+ AssertBounds<int32_t>(8, date(), 1500, 1500, metrics);
+
+ AssertCounts(9, 2, 0, metrics);
+ AssertBounds<int64_t>(9, time(), 2000, 3000, metrics);
+
+ AssertCounts(10, 2, 0, metrics);
+ AssertBounds<int64_t>(10, timestamp(), 0, 900, metrics);
+
+ AssertCounts(11, 2, 0, metrics);
+ std::vector<uint8_t> fixed_val = {'a', 'b', 'c', 'd'};
+ AssertBounds<std::vector<uint8_t>>(11, fixed(4), fixed_val, fixed_val,
metrics);
+
+ AssertCounts(12, 2, 0, metrics);
+ AssertBounds<std::vector<uint8_t>>(12, binary(), std::vector<uint8_t>{'S'},
+ std::vector<uint8_t>{'W'}, metrics);
+
+ AssertCounts(13, 2, 0, metrics);
+ AssertBounds<int64_t>(13, timestamp(), -1900300, -7000, metrics);
+}
+
+void MetricsTestBase::MetricsForDecimals() {
+ auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "decimalAsInt32", decimal(4, 2)),
+ SchemaField::MakeRequired(2, "decimalAsInt64", decimal(14, 2)),
+ SchemaField::MakeRequired(3, "decimalAsFixed", decimal(22, 2)),
+ });
+
+ auto arrow_schema = ::arrow::schema({
+ ::arrow::field("decimalAsInt32", ::arrow::decimal128(4, 2), false),
+ ::arrow::field("decimalAsInt64", ::arrow::decimal128(14, 2), false),
+ ::arrow::field("decimalAsFixed", ::arrow::decimal128(22, 2), false),
+ });
+
+ // Create decimal values
+ ::arrow::Decimal128Builder builder1(::arrow::decimal128(4, 2));
+ ::arrow::Decimal128Builder builder2(::arrow::decimal128(14, 2));
+ ::arrow::Decimal128Builder builder3(::arrow::decimal128(22, 2));
+
+ // 2.55, 4.75, 5.80
+ ASSERT_TRUE(builder1.Append(::arrow::Decimal128("255")).ok()); // 2.55 with
scale 2
+ ASSERT_TRUE(builder2.Append(::arrow::Decimal128("475")).ok()); // 4.75 with
scale 2
+ ASSERT_TRUE(builder3.Append(::arrow::Decimal128("580")).ok()); // 5.80 with
scale 2
+
+ auto array1 = builder1.Finish().ValueOrDie();
+ auto array2 = builder2.Finish().ValueOrDie();
+ auto array3 = builder3.Finish().ValueOrDie();
+
+ std::vector<std::shared_ptr<::arrow::Array>> field_arrays = {array1, array2,
array3};
+ auto records =
+ ::arrow::StructArray::Make(field_arrays,
arrow_schema->fields()).ValueOrDie();
+
+ ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, records));
+
+ ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
+ EXPECT_EQ(*metrics.row_count, 1);
+
+ AssertCounts(1, 1, 0, metrics);
+ // For decimals, bounds exist but we just check they're present
+ EXPECT_TRUE(metrics.lower_bounds.contains(1));
+ EXPECT_TRUE(metrics.upper_bounds.contains(1));
+
+ AssertCounts(2, 1, 0, metrics);
+ EXPECT_TRUE(metrics.lower_bounds.contains(2));
+ EXPECT_TRUE(metrics.upper_bounds.contains(2));
+
+ AssertCounts(3, 1, 0, metrics);
+ EXPECT_TRUE(metrics.lower_bounds.contains(3));
+ EXPECT_TRUE(metrics.upper_bounds.contains(3));
+}
+
+void MetricsTestBase::MetricsForNestedStructFields() {
+ auto schema = NestedSchema();
+
+ ICEBERG_UNWRAP_OR_FAIL(auto records, BuildNestedRecords());
+ ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, records));
+
+ ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
+ EXPECT_EQ(*metrics.row_count, 1);
+
+ AssertCounts(1, 1, 0, metrics);
+ AssertBounds<int32_t>(1, int32(), std::numeric_limits<int32_t>::min(),
+ std::numeric_limits<int32_t>::min(), metrics);
+
+ AssertCounts(3, 1, 0, metrics);
+ AssertBounds<int64_t>(3, int64(), 100, 100, metrics);
+
+ AssertCounts(5, 1, 0, metrics);
+ AssertBounds<int64_t>(5, int64(), 20, 20, metrics);
+
+ AssertCounts(6, 1L, 0L, metrics);
+ AssertBounds<std::vector<uint8_t>>(6, binary(), std::vector<uint8_t>{'A'},
+ std::vector<uint8_t>{'A'}, metrics);
+
+ // TODO(WZhuo) Assert NaN metrics
+ AssertCounts(7, 1L, 0L, metrics);
+ AssertBounds<double>(7, float64(), std::nullopt, std::nullopt, metrics);
+}
+
+void MetricsTestBase::MetricsModeForNestedStructFields() {
+ auto schema = NestedSchema();
+
+ // Create MetricsConfig with custom column modes
+ // Default mode is None, but nestedStructCol.longCol should be Full
+ std::unordered_map<std::string, std::string> properties = {
+ {"write.metadata.metrics.default", "none"},
+ {"write.metadata.metrics.column.nestedStructCol.longCol", "full"}};
+
+ ICEBERG_UNWRAP_OR_FAIL(auto config, MetricsConfig::Make(properties));
+ ICEBERG_UNWRAP_OR_FAIL(auto records, BuildNestedRecords());
+ ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, config, records));
+
+ ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
+ EXPECT_EQ(*metrics.row_count, 1);
+
+ // Only field 3 (nestedStructCol.longCol) should have bounds
+ EXPECT_EQ(metrics.lower_bounds.size(), 1);
+ EXPECT_EQ(metrics.upper_bounds.size(), 1);
+ AssertBounds<int64_t>(3, int64(), 100, 100, metrics);
+}
+
+void MetricsTestBase::MetricsForListAndMapElements() {
+ // Create struct type for map values
+ auto leaf_struct = struct_({
+ SchemaField::MakeRequired(1, "leafIntCol", int32()),
+ SchemaField::MakeOptional(2, "leafStringCol", string()),
+ });
+
+ // Create list and map types using constructors directly
+ auto list_type = list(SchemaField::MakeRequired(4, "element", int32()));
+ auto map_type = map(SchemaField::MakeRequired(6, "key", string()),
+ SchemaField::MakeRequired(7, "value", leaf_struct));
+
+ auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
+ SchemaField::MakeOptional(3, "intListCol", list_type),
+ SchemaField::MakeOptional(5, "mapCol", map_type),
+ });
+
+ // Create Arrow schema
+ auto arrow_leaf_struct = ::arrow::struct_({
+ ::arrow::field("leafIntCol", ::arrow::int32(), false),
+ ::arrow::field("leafStringCol", ::arrow::utf8(), true),
+ });
+
+ auto arrow_schema = ::arrow::schema({
+ ::arrow::field("intListCol",
+ ::arrow::list(::arrow::field("element", ::arrow::int32(),
false)),
+ true),
+ ::arrow::field("mapCol", ::arrow::map(::arrow::utf8(),
arrow_leaf_struct), true),
+ });
+
+ // Create list: [10, 11, 12]
+ ::arrow::Int32Builder int_builder;
+ ASSERT_TRUE(int_builder.Append(10).ok());
+ ASSERT_TRUE(int_builder.Append(11).ok());
+ ASSERT_TRUE(int_builder.Append(12).ok());
+ auto int_array = int_builder.Finish().ValueOrDie();
+
+ ::arrow::ListBuilder list_builder(::arrow::default_memory_pool(),
+ std::make_shared<::arrow::Int32Builder>());
+ ASSERT_TRUE(list_builder.Append().ok());
+ auto list_value_builder =
+ static_cast<::arrow::Int32Builder*>(list_builder.value_builder());
+ ASSERT_TRUE(list_value_builder->Append(10).ok());
+ ASSERT_TRUE(list_value_builder->Append(11).ok());
+ ASSERT_TRUE(list_value_builder->Append(12).ok());
+ auto list_array = list_builder.Finish().ValueOrDie();
+
+ // Create map: {"4" -> {leafIntCol: 1, leafStringCol: "BBB"}}
+ // MapArray needs offsets, keys, and items (struct values)
+ ::arrow::Int32Builder offset_builder;
+ ASSERT_TRUE(offset_builder.Append(0).ok()); // Start offset
+ ASSERT_TRUE(offset_builder.Append(1).ok()); // End offset (1 entry)
+ auto offsets = offset_builder.Finish().ValueOrDie();
+
+ ::arrow::StringBuilder key_builder;
+ ASSERT_TRUE(key_builder.Append("4").ok());
+ auto keys = key_builder.Finish().ValueOrDie();
+
+ ::arrow::Int32Builder struct_int_builder;
+ ::arrow::StringBuilder struct_str_builder;
+ ASSERT_TRUE(struct_int_builder.Append(1).ok());
+ ASSERT_TRUE(struct_str_builder.Append("BBB").ok());
+ auto struct_int_array = struct_int_builder.Finish().ValueOrDie();
+ auto struct_str_array = struct_str_builder.Finish().ValueOrDie();
+ auto items =
+ ::arrow::StructArray::Make({struct_int_array, struct_str_array},
+ {::arrow::field("leafIntCol",
::arrow::int32(), false),
+ ::arrow::field("leafStringCol",
::arrow::utf8(), true)})
+ .ValueOrDie();
+
+ auto map_array = ::arrow::MapArray::FromArrays(offsets, keys,
items).ValueOrDie();
+
+ std::vector<std::shared_ptr<::arrow::Array>> field_arrays = {list_array,
map_array};
+ auto records =
+ ::arrow::StructArray::Make(field_arrays,
arrow_schema->fields()).ValueOrDie();
+
+ ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, records));
+
+ ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
+ EXPECT_EQ(*metrics.row_count, 1);
+
+ // For list and map elements, metrics should not be collected
+ // Field IDs: 1 (leafIntCol), 2 (leafStringCol), 4 (list element), 6 (map
key), 7 (map
+ // value)
+ AssertCounts(1, std::nullopt, std::nullopt, metrics);
+ AssertCounts(2, std::nullopt, std::nullopt, metrics);
+ AssertCounts(4, std::nullopt, std::nullopt, metrics);
+ AssertCounts(6, std::nullopt, std::nullopt, metrics);
+
+ AssertBounds<int32_t>(1, int32(), std::nullopt, std::nullopt, metrics);
+ AssertBounds<std::string>(2, string(), std::nullopt, std::nullopt, metrics);
+ AssertBounds<int32_t>(4, int32(), std::nullopt, std::nullopt, metrics);
+ AssertBounds<std::string>(6, string(), std::nullopt, std::nullopt, metrics);
+ ASSERT_FALSE(metrics.lower_bounds.contains(7));
+ ASSERT_FALSE(metrics.upper_bounds.contains(7));
+}
+
+void MetricsTestBase::MetricsForNullColumns() {
+ auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
+ SchemaField::MakeOptional(1, "intCol", int32()),
+ });
+
+ auto arrow_schema = ::arrow::schema({
+ ::arrow::field("intCol", ::arrow::int32(), true),
+ });
+
+ auto records = CreateRecordArrays(arrow_schema, R"([
+ {"intCol": null},
+ {"intCol": null}
+ ])");
+
+ ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, records));
+
+ ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
+ EXPECT_EQ(*metrics.row_count, 2);
+ AssertCounts(1, 2, 2, metrics);
+ AssertBounds<int32_t>(1, int32(), std::nullopt, std::nullopt, metrics);
+}
+
+void MetricsTestBase::MetricsForNaNColumns() {
+ auto schema = FloatDoubleSchema();
+
+ auto arrow_schema = ::arrow::schema({
+ ::arrow::field("floatCol", ::arrow::float32(), true),
+ ::arrow::field("doubleCol", ::arrow::float64(), true),
+ });
+
+ ::arrow::FloatBuilder float_builder;
+ ::arrow::DoubleBuilder double_builder;
+
+
ASSERT_TRUE(float_builder.Append(std::numeric_limits<float>::quiet_NaN()).ok());
+
ASSERT_TRUE(double_builder.Append(std::numeric_limits<double>::quiet_NaN()).ok());
+
ASSERT_TRUE(float_builder.Append(std::numeric_limits<float>::quiet_NaN()).ok());
+
ASSERT_TRUE(double_builder.Append(std::numeric_limits<double>::quiet_NaN()).ok());
+
+ auto float_array = float_builder.Finish().ValueOrDie();
+ auto double_array = double_builder.Finish().ValueOrDie();
+
+ std::vector<std::shared_ptr<::arrow::Array>> field_arrays = {float_array,
double_array};
+ auto records =
+ ::arrow::StructArray::Make(field_arrays,
arrow_schema->fields()).ValueOrDie();
+
+ ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, records));
+
+ ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
+ EXPECT_EQ(*metrics.row_count, 2);
+ // TODO(WZhuo) Assert NaN metrics
+ AssertCounts(1, 2, 0, metrics);
+ AssertCounts(2, 2, 0, metrics);
+
+ // When all values are NaN, bounds should not be set
+ AssertBounds<float>(1, float32(), std::nullopt, std::nullopt, metrics);
+ AssertBounds<double>(2, float64(), std::nullopt, std::nullopt, metrics);
+}
+
+void MetricsTestBase::ColumnBoundsWithNaNValueAtFront() {
+ auto schema = FloatDoubleSchema();
+
+ auto arrow_schema = ::arrow::schema({
+ ::arrow::field("floatCol", ::arrow::float32(), true),
+ ::arrow::field("doubleCol", ::arrow::float64(), true),
+ });
+
+ ::arrow::FloatBuilder float_builder;
+ ::arrow::DoubleBuilder double_builder;
+
+ // NaN, 1.2, 5.6
+
ASSERT_TRUE(float_builder.Append(std::numeric_limits<float>::quiet_NaN()).ok());
+
ASSERT_TRUE(double_builder.Append(std::numeric_limits<double>::quiet_NaN()).ok());
+ ASSERT_TRUE(float_builder.Append(1.2F).ok());
+ ASSERT_TRUE(double_builder.Append(3.4).ok());
+ ASSERT_TRUE(float_builder.Append(5.6F).ok());
+ ASSERT_TRUE(double_builder.Append(7.8).ok());
+
+ auto float_array = float_builder.Finish().ValueOrDie();
+ auto double_array = double_builder.Finish().ValueOrDie();
+
+ std::vector<std::shared_ptr<::arrow::Array>> field_arrays = {float_array,
double_array};
+ auto records =
+ ::arrow::StructArray::Make(field_arrays,
arrow_schema->fields()).ValueOrDie();
+
+ ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, records));
+
+ ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
+ EXPECT_EQ(*metrics.row_count, 3);
+ // TODO(WZhuo) Assert NaN metrics
+ AssertCounts(1, 3, 0, metrics);
+ AssertCounts(2, 3, 0, metrics);
+
+ // Bounds should be computed from non-NaN values
+ if (metrics.lower_bounds.contains(1)) {
+ AssertBounds<float>(1, float32(), 1.2F, 5.6F, metrics);
+ AssertBounds<double>(2, float64(), 3.4, 7.8, metrics);
+ }
+}
+
+void MetricsTestBase::ColumnBoundsWithNaNValueInMiddle() {
+ auto schema = FloatDoubleSchema();
+
+ auto arrow_schema = ::arrow::schema({
+ ::arrow::field("floatCol", ::arrow::float32(), true),
+ ::arrow::field("doubleCol", ::arrow::float64(), true),
+ });
+
+ ::arrow::FloatBuilder float_builder;
+ ::arrow::DoubleBuilder double_builder;
+
+ // 1.2, NaN, 5.6
+ ASSERT_TRUE(float_builder.Append(1.2F).ok());
+ ASSERT_TRUE(double_builder.Append(3.4).ok());
+
ASSERT_TRUE(float_builder.Append(std::numeric_limits<float>::quiet_NaN()).ok());
+
ASSERT_TRUE(double_builder.Append(std::numeric_limits<double>::quiet_NaN()).ok());
+ ASSERT_TRUE(float_builder.Append(5.6F).ok());
+ ASSERT_TRUE(double_builder.Append(7.8).ok());
+
+ auto float_array = float_builder.Finish().ValueOrDie();
+ auto double_array = double_builder.Finish().ValueOrDie();
+
+ std::vector<std::shared_ptr<::arrow::Array>> field_arrays = {float_array,
double_array};
+ auto records =
+ ::arrow::StructArray::Make(field_arrays,
arrow_schema->fields()).ValueOrDie();
+
+ ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, records));
+
+ ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
+ EXPECT_EQ(*metrics.row_count, 3);
+ AssertCounts(1, 3, 0, metrics);
+ AssertCounts(2, 3, 0, metrics);
+
+ if (metrics.lower_bounds.contains(1)) {
+ AssertBounds<float>(1, float32(), 1.2F, 5.6F, metrics);
+ AssertBounds<double>(2, float64(), 3.4, 7.8, metrics);
+ }
+}
+
+void MetricsTestBase::ColumnBoundsWithNaNValueAtEnd() {
+ auto schema = FloatDoubleSchema();
+
+ auto arrow_schema = ::arrow::schema({
+ ::arrow::field("floatCol", ::arrow::float32(), true),
+ ::arrow::field("doubleCol", ::arrow::float64(), true),
+ });
+
+ ::arrow::FloatBuilder float_builder;
+ ::arrow::DoubleBuilder double_builder;
+
+ // 1.2, 5.6, NaN
+ ASSERT_TRUE(float_builder.Append(1.2F).ok());
+ ASSERT_TRUE(double_builder.Append(3.4).ok());
+ ASSERT_TRUE(float_builder.Append(5.6F).ok());
+ ASSERT_TRUE(double_builder.Append(7.8).ok());
+
ASSERT_TRUE(float_builder.Append(std::numeric_limits<float>::quiet_NaN()).ok());
+
ASSERT_TRUE(double_builder.Append(std::numeric_limits<double>::quiet_NaN()).ok());
+
+ auto float_array = float_builder.Finish().ValueOrDie();
+ auto double_array = double_builder.Finish().ValueOrDie();
+
+ std::vector<std::shared_ptr<::arrow::Array>> field_arrays = {float_array,
double_array};
+ auto records =
+ ::arrow::StructArray::Make(field_arrays,
arrow_schema->fields()).ValueOrDie();
+
+ ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, records));
+
+ ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
+ EXPECT_EQ(*metrics.row_count, 3);
+ AssertCounts(1, 3, 0, metrics);
+ AssertCounts(2, 3, 0, metrics);
+
+ if (metrics.lower_bounds.contains(1)) {
+ AssertBounds<float>(1, float32(), 1.2F, 5.6F, metrics);
+ AssertBounds<double>(2, float64(), 3.4, 7.8, metrics);
+ }
+}
+
+void MetricsTestBase::MetricsForTopLevelWithMultipleRowGroup() {
+ auto schema = SimpleSchema();
+ ICEBERG_UNWRAP_OR_FAIL(auto arrow_schema, ToArrowSchema(schema));
+
+ ICEBERG_UNWRAP_OR_FAIL(auto records, BuildSimpleRecords(arrow_schema, 201));
+ ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, records));
+
+ if (SupportsSmallRowGroups()) {
+ ICEBERG_UNWRAP_OR_FAIL(auto split_count, GetSplitCount());
+ EXPECT_EQ(split_count, 3);
+ }
+
+ ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
+ EXPECT_EQ(*metrics.row_count, 201);
+
+ // Verify metrics are collected for top-level fields
+ AssertCounts(1, 201, 0, metrics);
+ AssertBounds<bool>(1, boolean(), false, true, metrics);
+ AssertCounts(2, 201, 0, metrics);
+ AssertBounds<int32_t>(2, int32(), 3, 203, metrics);
+ AssertCounts(3, 201, 1, metrics);
+ AssertBounds<int64_t>(3, int64(), 1, 200, metrics);
+ AssertCounts(4, 201, 0, metrics);
+ AssertBounds<float>(4, float32(), 2.0F, 201.0F, metrics);
+ AssertCounts(5, 201, 0, metrics);
+ AssertBounds<double>(5, float64(), 2.0, 201.0, metrics);
+ AssertCounts(6, 201L, 1L, metrics);
+ AssertBounds<Decimal>(6, std::make_shared<DecimalType>(10, 2), Decimal(101),
+ Decimal(300), metrics);
+}
+
+void MetricsTestBase::MetricsForNestedStructFieldsWithMultipleRowGroup() {
+ auto schema = NestedSchema();
+ ICEBERG_UNWRAP_OR_FAIL(auto arrow_schema, ToArrowSchema(schema));
+ ICEBERG_UNWRAP_OR_FAIL(auto records, BuildNestedRecords(201));
+ ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, records));
+
+ if (SupportsSmallRowGroups()) {
+ ICEBERG_UNWRAP_OR_FAIL(auto split_count, GetSplitCount());
+ EXPECT_EQ(split_count, 3);
+ }
+ ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
+ EXPECT_EQ(*metrics.row_count, 201);
+
+ // Verify metrics for top-level field
+ AssertCounts(1, 201, 0, metrics);
+ AssertBounds<int32_t>(1, int32(), std::numeric_limits<int32_t>::min(),
+ std::numeric_limits<int32_t>::min() + 200, metrics);
+
+ // Verify metrics for nested struct fields
+ AssertCounts(3, 201, 0, metrics);
+ AssertBounds<int64_t>(3, int64(), 100, 100 + 200, metrics);
+
+ AssertCounts(5, 201, 0, metrics);
+ AssertBounds<int64_t>(5, int64(), 20, 20 + 200, metrics);
+
+ AssertCounts(6, 201, 0L, metrics);
+ AssertBounds<std::vector<uint8_t>>(6, binary(), std::vector<uint8_t>{'A'},
+ std::vector<uint8_t>{'A'}, metrics);
+
+ AssertCounts(7, 201, 0L, metrics);
+ AssertBounds<double>(7, float64(), std::nullopt, std::nullopt, metrics);
+}
+
+void MetricsTestBase::NoneMetricsMode() {
+ auto schema = NestedSchema();
+
+ std::unordered_map<std::string, std::string> properties = {
+ {"write.metadata.metrics.default", "none"}};
+
+ ICEBERG_UNWRAP_OR_FAIL(auto config, MetricsConfig::Make(properties));
+ ICEBERG_UNWRAP_OR_FAIL(auto records, BuildNestedRecords());
+ ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, config, records));
+
+ ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
+ EXPECT_EQ(*metrics.row_count, 1);
+
+ // In None mode, column_sizes should be empty
+ EXPECT_TRUE(metrics.column_sizes.empty());
+
+ // All counts should be null
+ AssertCounts(1, std::nullopt, std::nullopt, metrics);
+ AssertBounds<int32_t>(1, int32(), std::nullopt, std::nullopt, metrics);
+ AssertCounts(3, std::nullopt, std::nullopt, metrics);
+ AssertBounds<int64_t>(3, int64(), std::nullopt, std::nullopt, metrics);
+ AssertCounts(5, std::nullopt, std::nullopt, metrics);
+ AssertBounds<int64_t>(5, int64(), std::nullopt, std::nullopt, metrics);
+ AssertCounts(6, std::nullopt, std::nullopt, metrics);
+ AssertBounds<std::string>(6, binary(), std::nullopt, std::nullopt, metrics);
+ AssertCounts(7, std::nullopt, std::nullopt, metrics);
+ AssertBounds<double>(7, float64(), std::nullopt, std::nullopt, metrics);
+}
+
+void MetricsTestBase::CountsMetricsMode() {
+ auto schema = NestedSchema();
+
+ std::unordered_map<std::string, std::string> properties = {
+ {"write.metadata.metrics.default", "counts"}};
+
+ ICEBERG_UNWRAP_OR_FAIL(auto config, MetricsConfig::Make(properties));
+ ICEBERG_UNWRAP_OR_FAIL(auto records, BuildNestedRecords());
+ ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, config, records));
+
+ ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
+ EXPECT_EQ(*metrics.row_count, 1);
+
+ // In Counts mode, column_sizes should not be empty
+ EXPECT_FALSE(metrics.column_sizes.empty());
+
+ // Counts should be present but bounds should be null
+ AssertCounts(1, 1, 0, metrics);
+ AssertBounds<int32_t>(1, int32(), std::nullopt, std::nullopt, metrics);
+ AssertCounts(3, 1, 0, metrics);
+ AssertBounds<int64_t>(3, int64(), std::nullopt, std::nullopt, metrics);
+ AssertCounts(5, 1, 0, metrics);
+ AssertBounds<int64_t>(5, int64(), std::nullopt, std::nullopt, metrics);
+ AssertCounts(6, 1, 0, metrics);
+ AssertBounds<std::string>(6, binary(), std::nullopt, std::nullopt, metrics);
+ AssertCounts(7, 1, 0, metrics);
+ AssertBounds<double>(7, float64(), std::nullopt, std::nullopt, metrics);
+}
+
+void MetricsTestBase::FullMetricsMode() {
+ auto schema = NestedSchema();
+
+ std::unordered_map<std::string, std::string> properties = {
+ {"write.metadata.metrics.default", "full"}};
+
+ ICEBERG_UNWRAP_OR_FAIL(auto config, MetricsConfig::Make(properties));
+ ICEBERG_UNWRAP_OR_FAIL(auto records, BuildNestedRecords());
+ ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, config, records));
+
+ ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
+ EXPECT_EQ(*metrics.row_count, 1);
+
+ // In Full mode, column_sizes should not be empty
+ EXPECT_FALSE(metrics.column_sizes.empty());
+
+ // Both counts and bounds should be present
+ AssertCounts(1, 1, 0, metrics);
+ AssertBounds<int32_t>(1, int32(), std::numeric_limits<int32_t>::min(),
+ std::numeric_limits<int32_t>::min(), metrics);
+ AssertCounts(3, 1, 0, metrics);
+ AssertBounds<int64_t>(3, int64(), 100, 100, metrics);
+ AssertCounts(5, 1, 0, metrics);
+ AssertBounds<int64_t>(5, int64(), 20, 20, metrics);
+ AssertCounts(6, 1, 0, metrics);
+ AssertBounds<std::vector<uint8_t>>(6, binary(), std::vector<uint8_t>{'A'},
+ std::vector<uint8_t>{'A'}, metrics);
+ AssertCounts(7, 1, 0, metrics);
+ AssertBounds<double>(7, float64(), std::nullopt, std::nullopt, metrics);
+}
+
+void MetricsTestBase::TruncateStringMetricsMode() {
+ auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "str_to_truncate", string()),
+ });
+
+ auto arrow_schema = ::arrow::schema({
+ ::arrow::field("str_to_truncate", ::arrow::utf8(), false),
+ });
+
+ auto records = CreateRecordArrays(arrow_schema, R"([
+ {"str_to_truncate": "Lorem ipsum dolor sit amet"}
+ ])");
+
+ std::unordered_map<std::string, std::string> properties = {
+ {"write.metadata.metrics.default", "truncate(10)"}};
+
+ ICEBERG_UNWRAP_OR_FAIL(auto config, MetricsConfig::Make(properties));
+ ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, config, records));
+
+ ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
+ EXPECT_EQ(*metrics.row_count, 1);
+
+ // Column sizes should not be empty
+ EXPECT_FALSE(metrics.column_sizes.empty());
+
+ AssertCounts(1, 1, 0, metrics);
+
+ // Bounds should be truncated to 10 characters
+ // Lower bound: "Lorem ipsu" (first 10 chars)
+ // Upper bound: "Lorem ipsv" (first 10 chars with last char incremented)
+ std::string expected_lower = "Lorem ipsu";
+ std::string expected_upper = "Lorem ipsv";
+ AssertBounds<std::string>(1, string(), expected_lower, expected_upper,
metrics);
+}
+
+void MetricsTestBase::TruncateBinaryMetricsMode() {
+ auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "bin_to_truncate", binary()),
+ });
+
+ auto arrow_schema = ::arrow::schema({
+ ::arrow::field("bin_to_truncate", ::arrow::binary(), false),
+ });
+
+ // Create binary data: {0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0x10,
0xA, 0xB}
+ ::arrow::BinaryBuilder builder;
+ std::vector<uint8_t> data = {0x1, 0x2, 0x3, 0x4, 0x5, 0x6,
+ 0x7, 0x8, 0x9, 0x10, 0xA, 0xB};
+ ASSERT_TRUE(builder.Append(data.data(), data.size()).ok());
+ auto array = builder.Finish().ValueOrDie();
+
+ std::vector<std::shared_ptr<::arrow::Array>> field_arrays = {array};
+ auto records =
+ ::arrow::StructArray::Make(field_arrays,
arrow_schema->fields()).ValueOrDie();
+
+ std::unordered_map<std::string, std::string> properties = {
+ {"write.metadata.metrics.default", "truncate(5)"}};
+
+ ICEBERG_UNWRAP_OR_FAIL(auto config, MetricsConfig::Make(properties));
+ ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, config, records));
+
+ ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
+ EXPECT_EQ(*metrics.row_count, 1);
+
+ // Column sizes should not be empty
+ EXPECT_FALSE(metrics.column_sizes.empty());
+
+ AssertCounts(1, 1, 0, metrics);
+
+ // Bounds should be truncated to 5 bytes
+ // Lower bound: {0x1, 0x2, 0x3, 0x4, 0x5}
+ // Upper bound: {0x1, 0x2, 0x3, 0x4, 0x6} (last byte incremented)
+ auto expected_lower = std::vector<uint8_t>{0x1, 0x2, 0x3, 0x4, 0x5};
+ auto expected_upper = std::vector<uint8_t>{0x1, 0x2, 0x3, 0x4, 0x6};
+ AssertBounds<std::vector<uint8_t>>(1, binary(), expected_lower,
expected_upper,
+ metrics);
+}
+
+Result<std::shared_ptr<::arrow::Array>> MetricsTestBase::BuildSimpleRecords(
+ std::shared_ptr<::arrow::Schema> arrow_schema, int32_t count) {
+ ::arrow::BooleanBuilder boolean_builder;
+ ::arrow::Int32Builder int_builder;
+ ::arrow::Int64Builder long_builder;
+ ::arrow::FloatBuilder float_builder;
+ ::arrow::DoubleBuilder double_builder;
+ ::arrow::Decimal128Builder decimal_builder(::arrow::decimal128(10, 2));
+ ::arrow::StringBuilder string_builder;
+ ::arrow::Date32Builder date_builder;
+ ::arrow::Time64Builder
time_builder(::arrow::time64(::arrow::TimeUnit::MICRO),
+ ::arrow::default_memory_pool());
+ ::arrow::TimestampBuilder timestamp_above_builder(
+ ::arrow::timestamp(::arrow::TimeUnit::MICRO),
::arrow::default_memory_pool());
+ ::arrow::FixedSizeBinaryBuilder fixed_builder(::arrow::fixed_size_binary(4));
+ ::arrow::BinaryBuilder binary_builder;
+ ::arrow::TimestampBuilder timestamp_below_builder(
+ ::arrow::timestamp(::arrow::TimeUnit::MICRO),
::arrow::default_memory_pool());
+
+ // Append identical records
+ for (int i = 0; i < count; i++) {
+ ICEBERG_ARROW_RETURN_NOT_OK(boolean_builder.Append(i != 0));
+ ICEBERG_ARROW_RETURN_NOT_OK(int_builder.Append(3 + i));
+ ICEBERG_ARROW_RETURN_NOT_OK(i == 0 ? long_builder.AppendNull()
+ : long_builder.Append(i));
+ ICEBERG_ARROW_RETURN_NOT_OK(
+ i == 0 ? float_builder.Append(std::numeric_limits<float>::quiet_NaN())
+ : float_builder.Append(1.0 + i));
+ ICEBERG_ARROW_RETURN_NOT_OK(
+ i == 0 ?
double_builder.Append(std::numeric_limits<double>::quiet_NaN())
+ : double_builder.Append(1.0 + i));
+ ICEBERG_ARROW_RETURN_NOT_OK(i == 0
+ ? decimal_builder.AppendNull()
+ :
decimal_builder.Append(::arrow::Decimal128("100") +
+ i)); // 1.00
with scale 2
+ ICEBERG_ARROW_RETURN_NOT_OK(string_builder.Append("AAA"));
+ ICEBERG_ARROW_RETURN_NOT_OK(date_builder.Append(1500 + i));
+ ICEBERG_ARROW_RETURN_NOT_OK(time_builder.Append(2000 + i));
+ ICEBERG_ARROW_RETURN_NOT_OK(timestamp_above_builder.Append(i + 1));
+ ICEBERG_ARROW_RETURN_NOT_OK(fixed_builder.Append("abcd"));
+ ICEBERG_ARROW_RETURN_NOT_OK(binary_builder.Append("S"));
+ ICEBERG_ARROW_RETURN_NOT_OK(timestamp_below_builder.Append((i + 1) * -1));
+ }
+
+ auto boolean_array = boolean_builder.Finish().ValueOrDie();
+ auto int_array = int_builder.Finish().ValueOrDie();
+ auto long_array = long_builder.Finish().ValueOrDie();
+ auto float_array = float_builder.Finish().ValueOrDie();
+ auto double_array = double_builder.Finish().ValueOrDie();
+ auto decimal_array = decimal_builder.Finish().ValueOrDie();
+ auto string_array = string_builder.Finish().ValueOrDie();
+ auto date_array = date_builder.Finish().ValueOrDie();
+ auto time_array = time_builder.Finish().ValueOrDie();
+ auto timestamp_above_array = timestamp_above_builder.Finish().ValueOrDie();
+ auto fixed_array = fixed_builder.Finish().ValueOrDie();
+ auto binary_array = binary_builder.Finish().ValueOrDie();
+ auto timestamp_below_array = timestamp_below_builder.Finish().ValueOrDie();
+
+ std::vector<std::shared_ptr<::arrow::Array>> field_arrays = {
+ boolean_array, int_array, long_array,
+ float_array, double_array, decimal_array,
+ string_array, date_array, time_array,
+ timestamp_above_array, fixed_array, binary_array,
+ timestamp_below_array};
+ return ::arrow::StructArray::Make(field_arrays,
arrow_schema->fields()).ValueOrDie();
+}
+
+Result<std::shared_ptr<::arrow::Array>> MetricsTestBase::BuildNestedRecords(
+ int32_t count) {
+ auto leaf_struct_type = ::arrow::struct_({
+ ::arrow::field("leafLongCol", ::arrow::int64(), true),
+ ::arrow::field("leafBinaryCol", ::arrow::binary(), true),
+ });
+
+ auto nested_struct_type = ::arrow::struct_({
+ ::arrow::field("longCol", ::arrow::int64(), false),
+ ::arrow::field("leafStructCol", leaf_struct_type, false),
+ ::arrow::field("doubleCol", ::arrow::float64(), false),
+ });
+
+ auto arrow_schema = ::arrow::schema({
+ ::arrow::field("intCol", ::arrow::int32(), false),
+ ::arrow::field("nestedStructCol", nested_struct_type, false),
+ });
+
+ // Build leaf struct: {leafLongCol: 20, leafBinaryCol: "A"}
+ ::arrow::Int64Builder leaf_long_builder;
+ ::arrow::BinaryBuilder leaf_binary_builder;
+ ::arrow::Int64Builder nested_long_builder;
+ ::arrow::DoubleBuilder nested_double_builder;
+ ::arrow::Int32Builder int_builder;
+
+ for (int32_t i = 0; i < count; i++) {
+ ICEBERG_ARROW_RETURN_NOT_OK(leaf_long_builder.Append(20 + i));
+ ICEBERG_ARROW_RETURN_NOT_OK(leaf_binary_builder.Append("A"));
+
+ // Build nested struct: {longCol: 100, leafStructCol: {...}, doubleCol:
NaN}
+
+ ICEBERG_ARROW_RETURN_NOT_OK(nested_long_builder.Append(100 + i));
+ ICEBERG_ARROW_RETURN_NOT_OK(
+
nested_double_builder.Append(std::numeric_limits<double>::quiet_NaN()));
+
+ // Build top-level struct: {intCol: 2147483647, nestedStructCol: {...}}
+ ICEBERG_ARROW_RETURN_NOT_OK(
+ int_builder.Append(std::numeric_limits<int32_t>::min() + i));
+ }
+
+ ICEBERG_ARROW_ASSIGN_OR_RETURN(auto leaf_long_array,
leaf_long_builder.Finish());
+ ICEBERG_ARROW_ASSIGN_OR_RETURN(auto leaf_binary_array,
leaf_binary_builder.Finish());
+ ICEBERG_ARROW_ASSIGN_OR_RETURN(
+ auto leaf_struct_array,
+ ::arrow::StructArray::Make({leaf_long_array, leaf_binary_array},
+ leaf_struct_type->fields()));
+
+ // Build nested struct: {longCol: 100, leafStructCol: {...}, doubleCol: NaN}
+ ICEBERG_ARROW_ASSIGN_OR_RETURN(auto nested_long_array,
nested_long_builder.Finish());
+ ICEBERG_ARROW_ASSIGN_OR_RETURN(auto nested_double_array,
+ nested_double_builder.Finish());
+ ICEBERG_ARROW_ASSIGN_OR_RETURN(
+ auto nested_struct_array,
+ ::arrow::StructArray::Make(
+ {nested_long_array, leaf_struct_array, nested_double_array},
+ nested_struct_type->fields()));
+
+ // Build top-level struct: {intCol: 2147483647, nestedStructCol: {...}}
+ ICEBERG_ARROW_ASSIGN_OR_RETURN(auto int_array, int_builder.Finish());
+ ICEBERG_ARROW_ASSIGN_OR_RETURN(
+ auto records, ::arrow::StructArray::Make({int_array,
nested_struct_array},
+ arrow_schema->fields()));
+ return records;
+}
+
+} // namespace iceberg::test
diff --git a/src/iceberg/test/metrics_test_base.h
b/src/iceberg/test/metrics_test_base.h
new file mode 100644
index 00000000..1ad9c882
--- /dev/null
+++ b/src/iceberg/test/metrics_test_base.h
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <optional>
+#include <string>
+
+#include <arrow/array.h>
+
+#include "iceberg/file_io.h"
+#include "iceberg/metrics.h"
+#include "iceberg/metrics_config.h"
+#include "iceberg/result.h"
+#include "iceberg/schema.h"
+
+namespace iceberg::test {
+
+/// \brief Base test class for metrics testing, similar to Java's TestMetrics
+///
+/// This class provides common test infrastructure and helper methods for
testing
+/// metrics collection across different file formats (Parquet, Avro, ORC).
+class MetricsTestBase {
+ protected:
+ virtual void SetUp();
+
+ /// \brief Get metrics for the given schema and records
+ virtual Result<Metrics> GetMetrics(std::shared_ptr<Schema> schema,
+ std::shared_ptr<::arrow::Array> records)
= 0;
+
+ /// \brief Get metrics with custom MetricsConfig
+ virtual Result<Metrics> GetMetrics(std::shared_ptr<Schema> schema,
+ std::shared_ptr<MetricsConfig> config,
+ std::shared_ptr<::arrow::Array> records)
= 0;
+
+ /// \brief Create an output file for testing
+ virtual std::string CreateOutputFile() = 0;
+
+ /// \brief Get the number of row groups/splits in a file
+ virtual Result<int> GetSplitCount() = 0;
+
+ /// \brief Whether the format supports small row groups for testing
+ virtual bool SupportsSmallRowGroups() const { return false; }
+
+ // Helper methods for assertions
+ void AssertCounts(int field_id, std::optional<int64_t> expected_value_count,
+ std::optional<int64_t> expected_null_count, const Metrics&
metrics);
+
+ void AssertCounts(int field_id, std::optional<int64_t> expected_value_count,
+ std::optional<int64_t> expected_null_count,
+ std::optional<int64_t> expected_nan_count, const Metrics&
metrics);
+
+ template <typename T>
+ void AssertBounds(int field_id, std::shared_ptr<PrimitiveType> type,
+ std::optional<T> expected_lower, std::optional<T>
expected_upper,
+ const Metrics& metrics);
+
+ // Helper methods for creating test data
+ std::shared_ptr<::arrow::Array> CreateRecordArrays(
+ const std::shared_ptr<::arrow::Schema>& arrow_schema, const std::string&
json_data);
+
+ // Common test schemas
+ static std::shared_ptr<Schema> SimpleSchema();
+ static std::shared_ptr<Schema> NestedSchema();
+ static std::shared_ptr<Schema> FloatDoubleSchema();
+
+ // Test case methods - subclasses should call these from TEST_F macros
+ void MetricsForRepeatedValues();
+ void MetricsForTopLevelFields();
+ void MetricsForDecimals();
+ void MetricsForNestedStructFields();
+ void MetricsModeForNestedStructFields();
+ void MetricsForListAndMapElements();
+ void MetricsForNullColumns();
+ void MetricsForNaNColumns();
+ void ColumnBoundsWithNaNValueAtFront();
+ void ColumnBoundsWithNaNValueInMiddle();
+ void ColumnBoundsWithNaNValueAtEnd();
+ void MetricsForTopLevelWithMultipleRowGroup();
+ void MetricsForNestedStructFieldsWithMultipleRowGroup();
+ void NoneMetricsMode();
+ void CountsMetricsMode();
+ void FullMetricsMode();
+ void TruncateStringMetricsMode();
+ void TruncateBinaryMetricsMode();
+
+ private:
+ Result<std::shared_ptr<::arrow::Array>> BuildSimpleRecords(
+ std::shared_ptr<::arrow::Schema> arrow_schema, int32_t count = 1);
+ Result<std::shared_ptr<::arrow::Array>> BuildNestedRecords(int32_t count =
1);
+
+ protected:
+ std::shared_ptr<FileIO> file_io_;
+ std::string temp_dir_;
+ std::string path_;
+};
+
+#define DEFINE_METRICS_TEST_CASE(TestClass, Case) \
+ TEST_F(TestClass, Case) { Case(); }
+
+#define DEFINE_METRICS_TESTS(TestClass) \
+ DEFINE_METRICS_TEST_CASE(TestClass, MetricsForRepeatedValues) \
+ DEFINE_METRICS_TEST_CASE(TestClass, MetricsForTopLevelFields) \
+ DEFINE_METRICS_TEST_CASE(TestClass, MetricsForNestedStructFields) \
+ DEFINE_METRICS_TEST_CASE(TestClass, MetricsForNullColumns) \
+ DEFINE_METRICS_TEST_CASE(TestClass, MetricsForNaNColumns) \
+ DEFINE_METRICS_TEST_CASE(TestClass, ColumnBoundsWithNaNValueAtFront) \
+ DEFINE_METRICS_TEST_CASE(TestClass, ColumnBoundsWithNaNValueInMiddle) \
+ DEFINE_METRICS_TEST_CASE(TestClass, ColumnBoundsWithNaNValueAtEnd) \
+ DEFINE_METRICS_TEST_CASE(TestClass, MetricsForDecimals) \
+ DEFINE_METRICS_TEST_CASE(TestClass, MetricsForListAndMapElements) \
+ DEFINE_METRICS_TEST_CASE(TestClass, MetricsModeForNestedStructFields) \
+ DEFINE_METRICS_TEST_CASE(TestClass, NoneMetricsMode) \
+ DEFINE_METRICS_TEST_CASE(TestClass, CountsMetricsMode) \
+ DEFINE_METRICS_TEST_CASE(TestClass, FullMetricsMode) \
+ DEFINE_METRICS_TEST_CASE(TestClass, TruncateStringMetricsMode) \
+ DEFINE_METRICS_TEST_CASE(TestClass, TruncateBinaryMetricsMode) \
+ DEFINE_METRICS_TEST_CASE(TestClass, MetricsForTopLevelWithMultipleRowGroup) \
+ DEFINE_METRICS_TEST_CASE(TestClass,
MetricsForNestedStructFieldsWithMultipleRowGroup)
+
+} // namespace iceberg::test
diff --git a/src/iceberg/test/parquet_metrics_test.cc
b/src/iceberg/test/parquet_metrics_test.cc
new file mode 100644
index 00000000..93c024b0
--- /dev/null
+++ b/src/iceberg/test/parquet_metrics_test.cc
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <arrow/c/bridge.h>
+#include <arrow/filesystem/filesystem.h>
+#include <gtest/gtest.h>
+#include <parquet/arrow/reader.h>
+#include <parquet/arrow/writer.h>
+#include <parquet/metadata.h>
+
+#include "iceberg/arrow/arrow_io_internal.h"
+#include "iceberg/arrow/arrow_status_internal.h"
+#include "iceberg/file_writer.h"
+#include "iceberg/parquet/parquet_register.h"
+#include "iceberg/test/metrics_test_base.h"
+#include "iceberg/util/checked_cast.h"
+
+namespace iceberg::test {
+
+class ParquetMetricsTest : public MetricsTestBase, public ::testing::Test {
+ protected:
+ static void SetUpTestSuite() { parquet::RegisterAll(); }
+
+ void SetUp() override {
+ MetricsTestBase::SetUp();
+ temp_parquet_file_ = "parquet_metrics_test.parquet";
+ writer_properties_ = WriterProperties::FromMap(
+ {{WriterProperties::kParquetCompression.key(), "uncompressed"}});
+ }
+
+ Result<Metrics> GetMetrics(std::shared_ptr<Schema> schema,
+ std::shared_ptr<::arrow::Array> records) override
{
+ return GetMetrics(schema, MetricsConfig::Default(), records);
+ }
+
+ Result<Metrics> GetMetrics(std::shared_ptr<Schema> schema,
+ std::shared_ptr<MetricsConfig> config,
+ std::shared_ptr<::arrow::Array> records) override
{
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto writer, WriterFactoryRegistry::Open(FileFormatType::kParquet,
+ {.path = temp_parquet_file_,
+ .schema = schema,
+ .io = file_io_,
+ .metadata = {},
+ .metrics_config = config,
+ .properties =
writer_properties_}));
+ ArrowArray arr;
+ ICEBERG_ARROW_RETURN_NOT_OK(::arrow::ExportArray(*records, &arr));
+ ICEBERG_RETURN_UNEXPECTED(writer->Write(&arr));
+ ICEBERG_RETURN_UNEXPECTED(writer->Close());
+ return writer->metrics();
+ }
+
+ std::string CreateOutputFile() override { return temp_parquet_file_; }
+
+ Result<int> GetSplitCount() override {
+ auto io = internal::checked_cast<arrow::ArrowFileSystemFileIO&>(*file_io_);
+ auto infile = io.fs()->OpenInputFile(temp_parquet_file_).ValueOrDie();
+ auto metadata = ::parquet::ReadMetaData(infile);
+ return metadata->num_row_groups();
+ }
+
+ bool SupportsSmallRowGroups() const override { return false; }
+
+ private:
+ std::string temp_parquet_file_;
+ WriterProperties writer_properties_;
+};
+
+DEFINE_METRICS_TESTS(ParquetMetricsTest);
+
+} // namespace iceberg::test
diff --git a/src/iceberg/test/truncate_util_test.cc
b/src/iceberg/test/truncate_util_test.cc
index 849f67d7..f39e10ca 100644
--- a/src/iceberg/test/truncate_util_test.cc
+++ b/src/iceberg/test/truncate_util_test.cc
@@ -79,8 +79,9 @@ TEST(TruncateUtilTest, TruncateBinaryMax) {
EXPECT_EQ(result3, Literal::Binary(test3));
// Test3b: cannot truncate when first bytes are all 0xFF
- EXPECT_THAT(TruncateUtils::TruncateLiteralMax(Literal::Binary(test3), 2),
- IsError(ErrorKind::kInvalidArgument));
+ ICEBERG_UNWRAP_OR_FAIL(auto result3b,
+
TruncateUtils::TruncateLiteralMax(Literal::Binary(test3), 2));
+ EXPECT_EQ(result3b, std::nullopt);
// Test4: truncate {1, 1, 0} to 2 bytes -> {1, 2}
ICEBERG_UNWRAP_OR_FAIL(auto result4,
@@ -143,8 +144,9 @@ TEST(TruncateUtilTest, TruncateStringMax) {
// Test5: Max 4-byte UTF-8 characters "\uDBFF\uDFFF\uDBFF\uDFFF"
std::string test5 = "\xF4\x8F\xBF\xBF\xF4\x8F\xBF\xBF"; // U+10FFFF U+10FFFF
- EXPECT_THAT(TruncateUtils::TruncateLiteralMax(Literal::String(test5), 1),
- IsError(ErrorKind::kInvalidArgument));
+ ICEBERG_UNWRAP_OR_FAIL(auto result5_1,
+
TruncateUtils::TruncateLiteralMax(Literal::String(test5), 1));
+ EXPECT_EQ(result5_1, std::nullopt);
// Test6: 4-byte UTF-8 character "\uD800\uDFFF\uD800\uDFFF"
std::string test6 = "\xF0\x90\x8F\xBF\xF0\x90\x8F\xBF"; // U+103FF U+103FF
diff --git a/src/iceberg/util/truncate_util.cc
b/src/iceberg/util/truncate_util.cc
index aba22d17..8a1d0724 100644
--- a/src/iceberg/util/truncate_util.cc
+++ b/src/iceberg/util/truncate_util.cc
@@ -24,6 +24,7 @@
#include <utility>
#include "iceberg/expression/literal.h"
+#include "iceberg/type.h"
#include "iceberg/util/checked_cast.h"
namespace iceberg {
@@ -167,23 +168,26 @@ Literal TruncateLiteralImpl<TypeId::kBinary>(const
Literal& literal, int32_t wid
}
template <TypeId type_id>
-Result<Literal> TruncateLiteralMaxImpl(const Literal& literal, int32_t width)
= delete;
+Result<std::optional<Literal>> TruncateLiteralMaxImpl(const Literal& literal,
+ int32_t width) = delete;
template <>
-Result<Literal> TruncateLiteralMaxImpl<TypeId::kString>(const Literal& literal,
- int32_t width) {
+Result<std::optional<Literal>> TruncateLiteralMaxImpl<TypeId::kString>(
+ const Literal& literal, int32_t width) {
const auto& str = std::get<std::string>(literal.value());
- ICEBERG_ASSIGN_OR_RAISE(std::string truncated,
- TruncateUtils::TruncateUTF8Max(str, width));
- return Literal::String(std::move(truncated));
+ ICEBERG_ASSIGN_OR_RAISE(auto truncated, TruncateUtils::TruncateUTF8Max(str,
width));
+ if (!truncated.has_value()) {
+ return std::nullopt;
+ }
+ return std::optional<Literal>(Literal::String(std::move(truncated.value())));
}
template <>
-Result<Literal> TruncateLiteralMaxImpl<TypeId::kBinary>(const Literal& literal,
- int32_t width) {
+Result<std::optional<Literal>> TruncateLiteralMaxImpl<TypeId::kBinary>(
+ const Literal& literal, int32_t width) {
const auto& data = std::get<std::vector<uint8_t>>(literal.value());
if (static_cast<int32_t>(data.size()) <= width) {
- return literal;
+ return std::optional<Literal>(literal);
}
std::vector<uint8_t> truncated(data.begin(), data.begin() + width);
@@ -191,18 +195,19 @@ Result<Literal>
TruncateLiteralMaxImpl<TypeId::kBinary>(const Literal& literal,
if (*it < 0xFF) {
++(*it);
truncated.resize(truncated.size() - std::distance(truncated.rbegin(),
it));
- return Literal::Binary(std::move(truncated));
+ return std::optional<Literal>(Literal::Binary(std::move(truncated)));
}
}
- return InvalidArgument("Cannot truncate upper bound for binary: all bytes
are 0xFF");
+ return std::nullopt;
}
} // namespace
-Result<std::string> TruncateUtils::TruncateUTF8Max(const std::string& source,
size_t L) {
+Result<std::optional<std::string>> TruncateUtils::TruncateUTF8Max(
+ const std::string& source, size_t L) {
std::string truncated = TruncateUTF8(source, L);
if (truncated == source) {
- return truncated;
+ return std::optional<std::string>(std::move(truncated));
}
// Try incrementing code points from the end
@@ -231,13 +236,12 @@ Result<std::string> TruncateUtils::TruncateUTF8Max(const
std::string& source, si
if (next_code_point <= kUtf8MaxCodePoint) {
truncated.resize(cp_start);
AppendUtf8CodePoint(next_code_point, truncated);
- return truncated;
+ return std::optional<std::string>(std::move(truncated));
}
}
last_cp_start = cp_start;
}
- return InvalidArgument(
- "Cannot truncate upper bound for string: all code points are 0x10FFFF");
+ return std::nullopt;
}
Decimal TruncateUtils::TruncateDecimal(const Decimal& decimal, int32_t width) {
@@ -274,10 +278,11 @@ Result<Literal> TruncateUtils::TruncateLiteral(const
Literal& literal, int32_t w
case TYPE_ID: \
return TruncateLiteralMaxImpl<TYPE_ID>(literal, width);
-Result<Literal> TruncateUtils::TruncateLiteralMax(const Literal& literal,
int32_t width) {
+Result<std::optional<Literal>> TruncateUtils::TruncateLiteralMax(const
Literal& literal,
+ int32_t
width) {
if (literal.IsNull()) [[unlikely]] {
// Return null as is
- return literal;
+ return std::optional<Literal>(literal);
}
if (literal.IsAboveMax() || literal.IsBelowMin()) [[unlikely]] {
@@ -293,4 +298,26 @@ Result<Literal> TruncateUtils::TruncateLiteralMax(const
Literal& literal, int32_
}
}
+Result<Literal> TruncateUtils::TruncateLowerBound(const PrimitiveType& type,
+ const Literal& value,
int32_t length) {
+ switch (type.type_id()) {
+ case TypeId::kString:
+ case TypeId::kBinary:
+ return TruncateLiteral(value, length);
+ default:
+ return value;
+ }
+}
+
+Result<std::optional<Literal>> TruncateUtils::TruncateUpperBound(
+ const PrimitiveType& type, const Literal& value, int32_t length) {
+ switch (type.type_id()) {
+ case TypeId::kString:
+ case TypeId::kBinary:
+ return TruncateLiteralMax(value, length);
+ default:
+ return std::optional<Literal>(value);
+ }
+}
+
} // namespace iceberg
diff --git a/src/iceberg/util/truncate_util.h b/src/iceberg/util/truncate_util.h
index 1a1824a2..7fee86eb 100644
--- a/src/iceberg/util/truncate_util.h
+++ b/src/iceberg/util/truncate_util.h
@@ -20,6 +20,7 @@
#pragma once
#include <cstdint>
+#include <optional>
#include <string>
#include <utility>
@@ -71,9 +72,10 @@ class ICEBERG_EXPORT TruncateUtils {
/// \param source The input string to truncate.
/// \param L The maximum number of code points allowed in the output string.
/// \return A Result containing the original string (if no truncation is
- /// needed), or the smallest string greater than the truncated prefix, or an
- /// error if no such value exists or the input is invalid UTF-8.
- static Result<std::string> TruncateUTF8Max(const std::string& source, size_t
L);
+ /// needed), the smallest string greater than the truncated prefix, or
nullopt if no
+ /// safe upper bound can be represented. Invalid UTF-8 is returned as an
error.
+ static Result<std::optional<std::string>> TruncateUTF8Max(const std::string&
source,
+ size_t L);
/// \brief Truncate an integer v, either int32_t or int64_t, to v - (v % W).
///
@@ -109,10 +111,37 @@ class ICEBERG_EXPORT TruncateUtils {
///
/// \param value The input Literal maximum value to truncate.
/// \param width The width to truncate to.
- /// \return A Result containing either the original Literal (if no
truncation is needed)
- /// or the smallest Literal greater than the truncated prefix, or an error
if no such
- /// value exists or cannot be represented.
- static Result<Literal> TruncateLiteralMax(const Literal& value, int32_t
width);
+ /// \return A Result containing either the original Literal (if no
truncation is
+ /// needed), the smallest Literal greater than the truncated prefix, or
nullopt if no
+ /// safe upper bound can be represented.
+ static Result<std::optional<Literal>> TruncateLiteralMax(const Literal&
value,
+ int32_t width);
+
+ /// \brief Truncate the lower bound of a string or binary value.
+ ///
+ /// For string/binary types, truncates to the given length. For other types,
returns the
+ /// value unchanged.
+ ///
+ /// \param type The Iceberg primitive type.
+ /// \param value The lower bound literal value.
+ /// \param width The width to truncate to.
+ /// \return The truncated lower bound literal.
+ static Result<Literal> TruncateLowerBound(const PrimitiveType& type,
+ const Literal& value, int32_t
width);
+
+ /// \brief Truncate the upper bound of a string or binary value.
+ ///
+ /// For string/binary types, truncates to the smallest value greater than
the truncated
+ /// prefix. For other types, returns the value unchanged.
+ ///
+ /// \param type The Iceberg primitive type.
+ /// \param value The upper bound literal value.
+ /// \param width The width to truncate to.
+ /// \return The truncated upper bound literal, or nullopt if no safe upper
bound can be
+ /// represented.
+ static Result<std::optional<Literal>> TruncateUpperBound(const
PrimitiveType& type,
+ const Literal&
value,
+ int32_t width);
};
} // namespace iceberg