This is an automated email from the ASF dual-hosted git repository.
wgtmac pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new bc48b949 fix(parquet): fix parquet writer metrics conversion (#681)
bc48b949 is described below
commit bc48b949dfca6c02dea75b7d50de1e2dfca6485a
Author: Gang Wu <[email protected]>
AuthorDate: Tue May 26 20:33:21 2026 +0800
fix(parquet): fix parquet writer metrics conversion (#681)
This is a followup of https://github.com/apache/iceberg-cpp/pull/651 to
address all comments
---
src/iceberg/file_writer.h | 3 +
src/iceberg/metrics.h | 2 +-
src/iceberg/metrics_config.cc | 27 +-
src/iceberg/metrics_config.h | 10 +-
src/iceberg/parquet/parquet_metrics.cc | 272 ++++++++++++++++-----
...arquet_metrics.h => parquet_metrics_internal.h} | 5 +-
src/iceberg/parquet/parquet_writer.cc | 28 ++-
src/iceberg/test/metrics_test_base.cc | 47 ++--
src/iceberg/test/metrics_test_base.h | 4 +
src/iceberg/test/parquet_metrics_test.cc | 97 +++++++-
src/iceberg/util/truncate_util.cc | 14 +-
11 files changed, 387 insertions(+), 122 deletions(-)
diff --git a/src/iceberg/file_writer.h b/src/iceberg/file_writer.h
index 3c890453..4b1045a4 100644
--- a/src/iceberg/file_writer.h
+++ b/src/iceberg/file_writer.h
@@ -60,6 +60,9 @@ class ICEBERG_EXPORT WriterProperties : public
ConfigBase<WriterProperties> {
"zstd"};
inline static Entry<std::string> kParquetCompressionLevel{
"write.parquet.compression-level", ""};
+ /// \brief Maximum number of rows in each Parquet row group.
+ inline static Entry<int64_t>
kParquetMaxRowGroupRows{"write.parquet.max-row-group-rows",
+ 1024 * 1024};
/// TODO(gangwu): add table properties with write.avro|parquet|orc.*
diff --git a/src/iceberg/metrics.h b/src/iceberg/metrics.h
index 083cd041..8cfbbdf8 100644
--- a/src/iceberg/metrics.h
+++ b/src/iceberg/metrics.h
@@ -36,7 +36,7 @@ namespace iceberg {
/// lower/upper bounds for a specific field identified by its field_id.
struct ICEBERG_EXPORT FieldMetrics {
/// \brief The field ID this metrics belongs to.
- int32_t field_id;
+ int32_t field_id = -1;
/// \brief The total number of values (including nulls) for this field.
/// A negative value indicates the count is unknown.
diff --git a/src/iceberg/metrics_config.cc b/src/iceberg/metrics_config.cc
index ea20d47e..95d1a1fe 100644
--- a/src/iceberg/metrics_config.cc
+++ b/src/iceberg/metrics_config.cc
@@ -125,9 +125,8 @@ const std::shared_ptr<MetricsConfig>&
MetricsConfig::Default() {
Result<std::shared_ptr<MetricsConfig>> MetricsConfig::Make(const Table& table)
{
ICEBERG_ASSIGN_OR_RAISE(auto schema, table.schema());
- auto sort_order = table.sort_order();
- return MakeInternal(table.properties(), *schema,
- *sort_order.value_or(SortOrder::Unsorted()));
+ auto order = table.sort_order().value_or(SortOrder::Unsorted());
+ return MakeInternal(table.properties(), schema.get(), order.get());
}
Result<std::shared_ptr<MetricsConfig>> MetricsConfig::Make(
@@ -135,11 +134,11 @@ Result<std::shared_ptr<MetricsConfig>>
MetricsConfig::Make(
// Create a minimal TableProperties wrapper for the properties
TableProperties props = TableProperties::FromMap(std::move(properties));
- return MakeInternal(props, Schema({}), *SortOrder::Unsorted());
+ return MakeInternal(props, /*schema=*/nullptr, /*order=*/nullptr);
}
Result<std::shared_ptr<MetricsConfig>> MetricsConfig::MakeInternal(
- const TableProperties& props, const Schema& schema, const SortOrder&
order) {
+ const TableProperties& props, const Schema* schema, const SortOrder*
order) {
ColumnModeMap column_modes;
MetricsMode default_mode = kDefaultMetricsMode;
@@ -148,16 +147,16 @@ Result<std::shared_ptr<MetricsConfig>>
MetricsConfig::MakeInternal(
props.Get(TableProperties::kDefaultWriteMetricsMode);
ICEBERG_ASSIGN_OR_RAISE(default_mode,
ParseMode(configured_metrics_mode,
kDefaultMetricsMode));
- } else {
+ } else if (schema != nullptr) {
int32_t max_inferred_columns = MaxInferredColumns(props);
GetProjectedIdsVisitor visitor(/*include_struct_ids=*/true);
- ICEBERG_RETURN_UNEXPECTED(visitor.Visit(schema));
+ ICEBERG_RETURN_UNEXPECTED(visitor.Visit(*schema));
auto projected_columns = static_cast<int32_t>(visitor.Finish().size());
if (max_inferred_columns < projected_columns) {
ICEBERG_ASSIGN_OR_RAISE(auto limit_field_ids,
- LimitFieldIds(schema, max_inferred_columns));
+ LimitFieldIds(*schema, max_inferred_columns));
for (auto id : limit_field_ids) {
- ICEBERG_ASSIGN_OR_RAISE(auto column_name,
schema.FindColumnNameById(id));
+ ICEBERG_ASSIGN_OR_RAISE(auto column_name,
schema->FindColumnNameById(id));
ICEBERG_CHECK(column_name.has_value(), "Field id {} not found in
schema", id);
column_modes[std::string(column_name.value())] = kDefaultMetricsMode;
}
@@ -167,10 +166,12 @@ Result<std::shared_ptr<MetricsConfig>>
MetricsConfig::MakeInternal(
}
// First set sorted column with sorted column default (can be overridden by
user)
- auto sorted_col_default_mode = SortedColumnDefaultMode(default_mode);
- auto sorted_columns = SortOrder::OrderPreservingSortedColumns(schema, order);
- for (const auto& sorted_column : sorted_columns) {
- column_modes[std::string(sorted_column)] = sorted_col_default_mode;
+ if (schema != nullptr && order != nullptr) {
+ auto sorted_col_default_mode = SortedColumnDefaultMode(default_mode);
+ auto sorted_columns = SortOrder::OrderPreservingSortedColumns(*schema,
*order);
+ for (const auto& sorted_column : sorted_columns) {
+ column_modes[std::string(sorted_column)] = sorted_col_default_mode;
+ }
}
// Handle user overrides of defaults
diff --git a/src/iceberg/metrics_config.h b/src/iceberg/metrics_config.h
index a5e51ee6..bba7307d 100644
--- a/src/iceberg/metrics_config.h
+++ b/src/iceberg/metrics_config.h
@@ -101,12 +101,14 @@ class ICEBERG_EXPORT MetricsConfig {
///
/// \param props will be read for metrics overrides
(write.metadata.metrics.column.*)
/// and default(write.metadata.metrics.default)
- /// \param schema table schema
- /// \param order sort order columns, will be promoted to truncate(16)
+ /// \param schema table schema, or nullptr when only properties are available
+ /// \param order table sort order, or nullptr when unavailable. If provided,
sorted
+ /// columns use at least the default truncate metrics mode (`truncate(16)`)
when
+ /// the default mode is `none` or `counts`; explicit column overrides still
win.
/// \return metrics configuration
static Result<std::shared_ptr<MetricsConfig>> MakeInternal(const
TableProperties& props,
- const Schema&
schema,
- const SortOrder&
order);
+ const Schema*
schema,
+ const SortOrder*
order);
ColumnModeMap column_modes_;
MetricsMode default_mode_;
diff --git a/src/iceberg/parquet/parquet_metrics.cc
b/src/iceberg/parquet/parquet_metrics.cc
index 09b727a1..32dc9fec 100644
--- a/src/iceberg/parquet/parquet_metrics.cc
+++ b/src/iceberg/parquet/parquet_metrics.cc
@@ -17,13 +17,15 @@
* under the License.
*/
-#include "iceberg/parquet/parquet_metrics.h"
-
+#include <cstdint>
#include <limits>
#include <optional>
#include <ranges>
+#include <span>
#include <string>
#include <unordered_map>
+#include <utility>
+#include <vector>
#include <parquet/column_reader.h>
#include <parquet/schema.h>
@@ -31,12 +33,14 @@
#include <parquet/types.h>
#include "iceberg/expression/literal.h"
+#include "iceberg/parquet/parquet_metrics_internal.h"
#include "iceberg/result.h"
#include "iceberg/schema.h"
#include "iceberg/type.h"
#include "iceberg/util/checked_cast.h"
-#include "iceberg/util/conversions.h"
+#include "iceberg/util/decimal.h"
#include "iceberg/util/truncate_util.h"
+#include "iceberg/util/uuid.h"
#include "iceberg/util/visit_type.h"
namespace iceberg::parquet {
@@ -67,6 +71,163 @@ std::optional<int32_t> FindColumnIndex(const
::parquet::SchemaDescriptor& parque
return it != columns.end() ? std::optional(*it) : std::nullopt;
}
+int64_t CollectColumnSize(const ::parquet::FileMetaData& metadata, int32_t
column_idx) {
+ int64_t size = 0;
+ for (int rg = 0; rg < metadata.num_row_groups(); ++rg) {
+ size +=
metadata.RowGroup(rg)->ColumnChunk(column_idx)->total_compressed_size();
+ }
+ return size;
+}
+
+template <typename StatsType, typename Converter>
+Result<Literal> TypedStatsLiteral(const ::parquet::Statistics& stats, bool
is_min,
+ Converter&& converter) {
+ const auto& typed_stats = internal::checked_cast<const StatsType&>(stats);
+ return converter(is_min ? typed_stats.min() : typed_stats.max());
+}
+
+std::vector<uint8_t> BytesFromByteArray(const ::parquet::ByteArray& value) {
+ return std::vector<uint8_t>{value.ptr, value.ptr + value.len};
+}
+
+std::vector<uint8_t> BytesFromFLBA(const ::parquet::FixedLenByteArray& value,
+ int32_t length) {
+ return std::vector<uint8_t>{value.ptr, value.ptr + length};
+}
+
+Literal DecimalLiteral(int128_t value, const PrimitiveType& iceberg_type) {
+ const auto& decimal_type = internal::checked_cast<const
DecimalType&>(iceberg_type);
+ return Literal::Decimal(value, decimal_type.precision(),
decimal_type.scale());
+}
+
+Result<Literal> DecimalLiteralFromBytes(std::span<const uint8_t> bytes,
+ const PrimitiveType& iceberg_type) {
+ ICEBERG_ASSIGN_OR_RAISE(auto decimal,
+ Decimal::FromBigEndian(bytes.data(), bytes.size()));
+ return DecimalLiteral(decimal.value(), iceberg_type);
+}
+
+Result<Literal> BinaryStatsLiteral(std::vector<uint8_t> bytes,
+ const PrimitiveType& iceberg_type) {
+ switch (iceberg_type.type_id()) {
+ case TypeId::kString:
+ return Literal::String(std::string(bytes.begin(), bytes.end()));
+ case TypeId::kBinary:
+ return Literal::Binary(std::move(bytes));
+ case TypeId::kFixed:
+ return Literal::Fixed(std::move(bytes));
+ case TypeId::kUuid: {
+ ICEBERG_ASSIGN_OR_RAISE(auto uuid, Uuid::FromBytes(bytes));
+ return Literal::UUID(std::move(uuid));
+ }
+ case TypeId::kDecimal:
+ return DecimalLiteralFromBytes(bytes, iceberg_type);
+ default:
+ return InvalidArgument(
+ "Cannot convert Parquet binary statistics to Iceberg type {}",
+ iceberg_type.ToString());
+ }
+}
+
+Result<Literal> Int32StatsLiteral(int32_t value, const PrimitiveType&
iceberg_type) {
+ switch (iceberg_type.type_id()) {
+ case TypeId::kInt:
+ return Literal::Int(value);
+ case TypeId::kLong:
+ return Literal::Long(value);
+ case TypeId::kDate:
+ return Literal::Date(value);
+ case TypeId::kDecimal:
+ return DecimalLiteral(value, iceberg_type);
+ default:
+ return InvalidArgument("Cannot convert Parquet INT32 statistics to
Iceberg type {}",
+ iceberg_type.ToString());
+ }
+}
+
+Result<Literal> Int64StatsLiteral(int64_t value, const PrimitiveType&
iceberg_type) {
+ switch (iceberg_type.type_id()) {
+ case TypeId::kLong:
+ return Literal::Long(value);
+ case TypeId::kTime:
+ return Literal::Time(value);
+ case TypeId::kTimestamp:
+ return Literal::Timestamp(value);
+ case TypeId::kTimestampTz:
+ return Literal::TimestampTz(value);
+ case TypeId::kTimestampNs:
+ return Literal::TimestampNs(value);
+ case TypeId::kTimestampTzNs:
+ return Literal::TimestampTzNs(value);
+ case TypeId::kDecimal:
+ return DecimalLiteral(value, iceberg_type);
+ default:
+ return InvalidArgument("Cannot convert Parquet INT64 statistics to
Iceberg type {}",
+ iceberg_type.ToString());
+ }
+}
+
+Result<Literal> FloatStatsLiteral(float value, const PrimitiveType&
iceberg_type) {
+ switch (iceberg_type.type_id()) {
+ case TypeId::kFloat:
+ return Literal::Float(value);
+ case TypeId::kDouble:
+ return Literal::Double(value);
+ default:
+ return InvalidArgument("Cannot convert Parquet FLOAT statistics to
Iceberg type {}",
+ iceberg_type.ToString());
+ }
+}
+
+bool IsFloatingType(const PrimitiveType& type) {
+ return type.type_id() == TypeId::kFloat || type.type_id() == TypeId::kDouble;
+}
+
+bool NeedsBoundTruncation(const PrimitiveType& type) {
+ return type.type_id() == TypeId::kString || type.type_id() ==
TypeId::kBinary;
+}
+
+Result<Literal> StatsValueToLiteral(const ::parquet::ColumnDescriptor& column,
+ const PrimitiveType& iceberg_type,
+ const ::parquet::Statistics& stats, bool
is_min) {
+ switch (column.physical_type()) {
+ case ::parquet::Type::BOOLEAN:
+ return TypedStatsLiteral<::parquet::BoolStatistics>(
+ stats, is_min, [](bool value) { return Literal::Boolean(value); });
+ case ::parquet::Type::INT32:
+ return TypedStatsLiteral<::parquet::Int32Statistics>(
+ stats, is_min,
+ [&](int32_t value) { return Int32StatsLiteral(value, iceberg_type);
});
+ case ::parquet::Type::INT64:
+ return TypedStatsLiteral<::parquet::Int64Statistics>(
+ stats, is_min,
+ [&](int64_t value) { return Int64StatsLiteral(value, iceberg_type);
});
+ case ::parquet::Type::FLOAT:
+ return TypedStatsLiteral<::parquet::FloatStatistics>(
+ stats, is_min,
+ [&](float value) { return FloatStatsLiteral(value, iceberg_type); });
+ case ::parquet::Type::DOUBLE:
+ return TypedStatsLiteral<::parquet::DoubleStatistics>(
+ stats, is_min, [](double value) { return Literal::Double(value); });
+ case ::parquet::Type::BYTE_ARRAY:
+ return TypedStatsLiteral<::parquet::ByteArrayStatistics>(
+ stats, is_min, [&](const ::parquet::ByteArray& value) {
+ return BinaryStatsLiteral(BytesFromByteArray(value), iceberg_type);
+ });
+ case ::parquet::Type::FIXED_LEN_BYTE_ARRAY:
+ return TypedStatsLiteral<::parquet::FLBAStatistics>(
+ stats, is_min, [&](const ::parquet::FixedLenByteArray& value) {
+ return BinaryStatsLiteral(BytesFromFLBA(value,
column.type_length()),
+ iceberg_type);
+ });
+ case ::parquet::Type::INT96:
+ case ::parquet::Type::UNDEFINED:
+ return NotSupported("Cannot convert Parquet statistics for physical type
{}",
+ static_cast<int>(column.physical_type()));
+ }
+ std::unreachable();
+}
+
/// \brief Collect counts (value count and null count) from footer statistics.
/// \param field_id The Iceberg field ID.
/// \param metadata The Parquet file metadata.
@@ -105,6 +266,7 @@ Result<std::optional<FieldMetrics>> CollectBounds(
int32_t field_id, std::shared_ptr<PrimitiveType> iceberg_type,
const ::parquet::FileMetaData& metadata, int32_t column_idx,
int32_t truncate_length) {
+ auto column_desc = metadata.schema()->Column(column_idx);
int64_t null_count = 0;
int64_t value_count = 0;
std::optional<Literal> lower_bound;
@@ -122,28 +284,24 @@ Result<std::optional<FieldMetrics>> CollectBounds(
value_count += column_chunk->num_values();
if (stats->HasMinMax()) {
- auto min_bytes = stats->EncodeMin();
- auto min_span = std::span<const uint8_t>(
- reinterpret_cast<const uint8_t*>(min_bytes.data()),
min_bytes.size());
ICEBERG_ASSIGN_OR_RAISE(auto min_value,
- Conversions::FromBytes(iceberg_type, min_span));
+ StatsValueToLiteral(*column_desc, *iceberg_type,
*stats,
+ /*is_min=*/true));
if (!lower_bound.has_value() || min_value < lower_bound.value()) {
lower_bound = std::move(min_value);
}
- auto max_bytes = stats->EncodeMax();
- auto max_span = std::span<const uint8_t>(
- reinterpret_cast<const uint8_t*>(max_bytes.data()),
max_bytes.size());
ICEBERG_ASSIGN_OR_RAISE(auto max_value,
- Conversions::FromBytes(iceberg_type, max_span));
+ StatsValueToLiteral(*column_desc, *iceberg_type,
*stats,
+ /*is_min=*/false));
if (!upper_bound.has_value() || max_value > upper_bound.value()) {
upper_bound = std::move(max_value);
}
}
}
- if (!lower_bound.has_value() || !upper_bound.has_value() ||
lower_bound->IsNaN() ||
- upper_bound->IsNaN()) {
+ if (!lower_bound.has_value() || !upper_bound.has_value() ||
+ (IsFloatingType(*iceberg_type) && (lower_bound->IsNaN() ||
upper_bound->IsNaN()))) {
return FieldMetrics{
.field_id = field_id,
.value_count = value_count,
@@ -151,19 +309,21 @@ Result<std::optional<FieldMetrics>> CollectBounds(
};
}
- ICEBERG_ASSIGN_OR_RAISE(auto truncated_lower,
- TruncateUtils::TruncateLowerBound(
- *iceberg_type, lower_bound.value(),
truncate_length));
- ICEBERG_ASSIGN_OR_RAISE(auto truncated_upper,
- TruncateUtils::TruncateUpperBound(
- *iceberg_type, upper_bound.value(),
truncate_length));
+ if (NeedsBoundTruncation(*iceberg_type)) {
+ ICEBERG_ASSIGN_OR_RAISE(
+ lower_bound, TruncateUtils::TruncateLowerBound(*iceberg_type,
lower_bound.value(),
+ truncate_length));
+ ICEBERG_ASSIGN_OR_RAISE(
+ upper_bound, TruncateUtils::TruncateUpperBound(*iceberg_type,
upper_bound.value(),
+ truncate_length));
+ }
return FieldMetrics{
.field_id = field_id,
.value_count = value_count,
.null_value_count = null_count,
- .lower_bound = std::move(truncated_lower),
- .upper_bound = std::move(truncated_upper),
+ .lower_bound = std::move(lower_bound),
+ .upper_bound = std::move(upper_bound),
};
}
@@ -187,19 +347,27 @@ Result<std::optional<FieldMetrics>>
MetricsFromFieldMetrics(
.null_value_count = fm.null_value_count,
.nan_value_count = fm.nan_value_count};
- if (truncate_length > 0) {
- if (fm.lower_bound.has_value()) {
- ICEBERG_ASSIGN_OR_RAISE(
- auto lower, TruncateUtils::TruncateLowerBound(
- *primitive_type, fm.lower_bound.value(),
truncate_length));
- result.lower_bound = std::move(lower);
- }
- if (fm.upper_bound.has_value()) {
- ICEBERG_ASSIGN_OR_RAISE(
- auto upper, TruncateUtils::TruncateUpperBound(
- *primitive_type, fm.upper_bound.value(),
truncate_length));
- result.upper_bound = std::move(upper);
- }
+ if (truncate_length <= 0) {
+ return result;
+ }
+
+ if (!NeedsBoundTruncation(*primitive_type)) {
+ result.lower_bound = fm.lower_bound;
+ result.upper_bound = fm.upper_bound;
+ return result;
+ }
+
+ if (fm.lower_bound.has_value()) {
+ ICEBERG_ASSIGN_OR_RAISE(
+ result.lower_bound,
+ TruncateUtils::TruncateLowerBound(*primitive_type,
fm.lower_bound.value(),
+ truncate_length));
+ }
+ if (fm.upper_bound.has_value()) {
+ ICEBERG_ASSIGN_OR_RAISE(
+ result.upper_bound,
+ TruncateUtils::TruncateUpperBound(*primitive_type,
fm.upper_bound.value(),
+ truncate_length));
}
return result;
@@ -279,6 +447,10 @@ class CollectMetricsVisitor {
int32_t truncate_length = mode.TruncateLength();
const auto& primitive_type =
internal::checked_pointer_cast<PrimitiveType>(field.type());
+ auto column_idx = FindColumnIndex(parquet_schema_, field_id);
+ if (column_idx.has_value()) {
+ metrics_.column_sizes[field_id] = CollectColumnSize(metadata_,
column_idx.value());
+ }
ICEBERG_ASSIGN_OR_RAISE(auto field_metrics,
MetricsFromFieldMetrics(field_id, field_metrics_,
@@ -330,36 +502,10 @@ Result<Metrics> ParquetMetrics::GetMetrics(
const std::unordered_map<int32_t, FieldMetrics>& field_metrics) {
Metrics metrics;
- // Collect row count and column sizes
- int64_t row_count = 0;
- for (int rg = 0; rg < metadata.num_row_groups(); ++rg) {
- auto row_group = metadata.RowGroup(rg);
- row_count += row_group->num_rows();
- for (int col = 0; col < row_group->num_columns(); ++col) {
- auto column_chunk = row_group->ColumnChunk(col);
- auto field_id_opt = GetFieldId(*parquet_schema.Column(col));
- if (!field_id_opt.has_value()) {
- continue;
- }
- int32_t field_id = field_id_opt.value();
-
- ICEBERG_ASSIGN_OR_RAISE(auto field_name,
schema.FindColumnNameById(field_id));
- if (!field_name.has_value()) {
- continue;
- }
-
- MetricsMode mode = metrics_config.ColumnMode(field_name.value());
- if (mode.kind != MetricsMode::Kind::kNone) {
- metrics.column_sizes[field_id] =
- metrics.column_sizes.contains(field_id)
- ? metrics.column_sizes[field_id] +
column_chunk->total_compressed_size()
- : column_chunk->total_compressed_size();
- }
- }
- }
- metrics.row_count = row_count;
+ metrics.row_count = metadata.num_rows();
- // Collect metrics for all primitive fields
+ // Apply MetricsConfig while visiting schema fields, then collect footer
metrics only
+ // for fields whose mode is not `none`.
CollectMetricsVisitor visitor(parquet_schema, metrics_config, metadata,
field_metrics,
metrics);
ICEBERG_RETURN_UNEXPECTED(visitor.VisitStruct(schema, ""));
diff --git a/src/iceberg/parquet/parquet_metrics.h
b/src/iceberg/parquet/parquet_metrics_internal.h
similarity index 95%
rename from src/iceberg/parquet/parquet_metrics.h
rename to src/iceberg/parquet/parquet_metrics_internal.h
index eb916241..c78c114a 100644
--- a/src/iceberg/parquet/parquet_metrics.h
+++ b/src/iceberg/parquet/parquet_metrics_internal.h
@@ -19,14 +19,13 @@
#pragma once
-/// \file iceberg/parquet/parquet_metrics.h
+/// \file iceberg/parquet/parquet_metrics_internal.h
/// \brief Utilities for extracting metrics from Parquet files.
#include <unordered_map>
#include <parquet/metadata.h>
-#include "iceberg/iceberg_bundle_export.h"
#include "iceberg/metrics.h"
#include "iceberg/metrics_config.h"
#include "iceberg/result.h"
@@ -35,7 +34,7 @@
namespace iceberg::parquet {
/// \brief Utility class for computing metrics from Parquet files.
-class ICEBERG_BUNDLE_EXPORT ParquetMetrics {
+class ParquetMetrics {
public:
ParquetMetrics() = delete;
diff --git a/src/iceberg/parquet/parquet_writer.cc
b/src/iceberg/parquet/parquet_writer.cc
index e91a2a6c..da794cc3 100644
--- a/src/iceberg/parquet/parquet_writer.cc
+++ b/src/iceberg/parquet/parquet_writer.cc
@@ -21,6 +21,7 @@
#include <memory>
#include <string_view>
+#include <vector>
#include <arrow/c/bridge.h>
#include <arrow/record_batch.h>
@@ -33,7 +34,7 @@
#include "iceberg/arrow/arrow_io_internal.h"
#include "iceberg/arrow/arrow_status_internal.h"
-#include "iceberg/parquet/parquet_metrics.h"
+#include "iceberg/parquet/parquet_metrics_internal.h"
#include "iceberg/schema_internal.h"
#include "iceberg/util/macros.h"
@@ -94,6 +95,11 @@ class ParquetWriter::Impl {
auto properties_builder = ::parquet::WriterProperties::Builder();
properties_builder.compression(compression);
+ auto max_row_group_rows =
+ options.properties.Get(WriterProperties::kParquetMaxRowGroupRows);
+ ICEBERG_PRECHECK(max_row_group_rows > 0,
+ "Parquet max row group rows must be greater than 0");
+ properties_builder.max_row_group_length(max_row_group_rows);
if (compression_level.has_value()) {
properties_builder.compression_level(compression_level.value());
}
@@ -142,12 +148,11 @@ class ParquetWriter::Impl {
}
ICEBERG_ARROW_RETURN_NOT_OK(writer_->Close());
- auto& metadata = writer_->metadata();
- split_offsets_.reserve(metadata->num_row_groups());
- for (int i = 0; i < metadata->num_row_groups(); ++i) {
- split_offsets_.push_back(metadata->RowGroup(i)->file_offset());
- }
metadata_ = writer_->metadata();
+ split_offsets_.reserve(metadata_->num_row_groups());
+ for (int i = 0; i < metadata_->num_row_groups(); ++i) {
+ split_offsets_.push_back(metadata_->RowGroup(i)->file_offset());
+ }
writer_.reset();
ICEBERG_ARROW_ASSIGN_OR_RETURN(total_bytes_, output_stream_->Tell());
@@ -171,15 +176,12 @@ class ParquetWriter::Impl {
std::vector<int64_t> split_offsets() const { return split_offsets_; }
Result<Metrics> metrics() {
- if (writer_) {
- return Invalid("Cannot return metrics for unclosed writer");
- }
- if (!metadata_) {
- return Metrics();
- }
+ ICEBERG_PRECHECK(writer_ == nullptr, "Cannot return metrics for unclosed
writer");
+ ICEBERG_PRECHECK(metadata_ != nullptr,
+ "Cannot return metrics because Parquet metadata is not
available");
// TODO(WZhuo): collect write-side FieldMetrics to support NaN value
counts.
return ParquetMetrics::GetMetrics(*schema_, *parquet_schema_,
*metrics_config_,
- *metadata_, {});
+ *metadata_);
}
private:
diff --git a/src/iceberg/test/metrics_test_base.cc
b/src/iceberg/test/metrics_test_base.cc
index cc5f7cd6..7913a720 100644
--- a/src/iceberg/test/metrics_test_base.cc
+++ b/src/iceberg/test/metrics_test_base.cc
@@ -22,6 +22,7 @@
#include <arrow/builder.h>
#include <arrow/c/bridge.h>
#include <arrow/json/from_string.h>
+#include <gmock/gmock.h>
#include "iceberg/arrow/arrow_io_internal.h"
#include "iceberg/arrow/arrow_status_internal.h"
@@ -80,6 +81,17 @@ void MetricsTestBase::AssertCounts(int field_id,
}
}
+void MetricsTestBase::AssertColumnSizeFields(std::vector<int32_t>
expected_field_ids,
+ const Metrics& metrics) {
+ std::vector<int32_t> actual_field_ids;
+ actual_field_ids.reserve(metrics.column_sizes.size());
+ for (const auto& [field_id, size] : metrics.column_sizes) {
+ EXPECT_GT(size, 0) << "Field " << field_id << " should have a positive
size";
+ actual_field_ids.push_back(field_id);
+ }
+ EXPECT_THAT(actual_field_ids,
testing::UnorderedElementsAreArray(expected_field_ids));
+}
+
template <typename T>
void MetricsTestBase::AssertBounds(int field_id,
std::shared_ptr<PrimitiveType> type,
std::optional<T> expected_lower,
@@ -91,6 +103,8 @@ void MetricsTestBase::AssertBounds(int field_id,
std::shared_ptr<PrimitiveType>
const auto& literal = metrics.lower_bounds.at(field_id);
ASSERT_FALSE(literal.IsNull())
<< "Field " << field_id << " lower bound literal should not be null";
+ EXPECT_EQ(*literal.type(), *type)
+ << "Field " << field_id << " lower bound literal type mismatch";
EXPECT_EQ(std::get<T>(literal.value()), expected_lower.value())
<< "Field " << field_id << " lower bound mismatch";
} else {
@@ -103,6 +117,8 @@ void MetricsTestBase::AssertBounds(int field_id,
std::shared_ptr<PrimitiveType>
const auto& literal = metrics.upper_bounds.at(field_id);
ASSERT_FALSE(literal.IsNull())
<< "Field " << field_id << " upper bound literal should not be null";
+ EXPECT_EQ(*literal.type(), *type)
+ << "Field " << field_id << " upper bound literal type mismatch";
EXPECT_EQ(std::get<T>(literal.value()), expected_upper.value())
<< "Field " << field_id << " upper bound mismatch";
} else {
@@ -208,12 +224,12 @@ void MetricsTestBase::MetricsForRepeatedValues() {
ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
EXPECT_EQ(*metrics.row_count, 2);
+ AssertColumnSizeFields({1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}, metrics);
AssertCounts(1, 2, 0, metrics);
AssertCounts(2, 2, 0, metrics);
AssertCounts(3, 2, 1, metrics);
- // TODO(WZhuo) Assert NaN metrics
- AssertCounts(4, 2, 0, metrics); // floatCol has 2 NaN values
+ AssertCounts(4, 2, 0, metrics);
AssertCounts(5, 2, 0, metrics);
AssertCounts(6, 2, 1, metrics);
AssertCounts(7, 2, 0, metrics);
@@ -317,17 +333,13 @@ void MetricsTestBase::MetricsForDecimals() {
EXPECT_EQ(*metrics.row_count, 1);
AssertCounts(1, 1, 0, metrics);
- // For decimals, bounds exist but we just check they're present
- EXPECT_TRUE(metrics.lower_bounds.contains(1));
- EXPECT_TRUE(metrics.upper_bounds.contains(1));
+ AssertBounds<Decimal>(1, decimal(4, 2), Decimal(255), Decimal(255), metrics);
AssertCounts(2, 1, 0, metrics);
- EXPECT_TRUE(metrics.lower_bounds.contains(2));
- EXPECT_TRUE(metrics.upper_bounds.contains(2));
+ AssertBounds<Decimal>(2, decimal(14, 2), Decimal(475), Decimal(475),
metrics);
AssertCounts(3, 1, 0, metrics);
- EXPECT_TRUE(metrics.lower_bounds.contains(3));
- EXPECT_TRUE(metrics.upper_bounds.contains(3));
+ AssertBounds<Decimal>(3, decimal(22, 2), Decimal(580), Decimal(580),
metrics);
}
void MetricsTestBase::MetricsForNestedStructFields() {
@@ -338,6 +350,7 @@ void MetricsTestBase::MetricsForNestedStructFields() {
ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
EXPECT_EQ(*metrics.row_count, 1);
+ AssertColumnSizeFields({1, 3, 5, 6, 7}, metrics);
AssertCounts(1, 1, 0, metrics);
AssertBounds<int32_t>(1, int32(), std::numeric_limits<int32_t>::min(),
@@ -353,7 +366,6 @@ void MetricsTestBase::MetricsForNestedStructFields() {
AssertBounds<std::vector<uint8_t>>(6, binary(), std::vector<uint8_t>{'A'},
std::vector<uint8_t>{'A'}, metrics);
- // TODO(WZhuo) Assert NaN metrics
AssertCounts(7, 1L, 0L, metrics);
AssertBounds<double>(7, float64(), std::nullopt, std::nullopt, metrics);
}
@@ -373,6 +385,7 @@ void MetricsTestBase::MetricsModeForNestedStructFields() {
ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
EXPECT_EQ(*metrics.row_count, 1);
+ AssertColumnSizeFields({3}, metrics);
// Only field 3 (nestedStructCol.longCol) should have bounds
EXPECT_EQ(metrics.lower_bounds.size(), 1);
@@ -460,6 +473,7 @@ void MetricsTestBase::MetricsForListAndMapElements() {
ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
EXPECT_EQ(*metrics.row_count, 1);
+ AssertColumnSizeFields({}, metrics);
// For list and map elements, metrics should not be collected
// Field IDs: 1 (leafIntCol), 2 (leafStringCol), 4 (list element), 6 (map
key), 7 (map
@@ -468,6 +482,7 @@ void MetricsTestBase::MetricsForListAndMapElements() {
AssertCounts(2, std::nullopt, std::nullopt, metrics);
AssertCounts(4, std::nullopt, std::nullopt, metrics);
AssertCounts(6, std::nullopt, std::nullopt, metrics);
+ AssertCounts(7, std::nullopt, std::nullopt, metrics);
AssertBounds<int32_t>(1, int32(), std::nullopt, std::nullopt, metrics);
AssertBounds<std::string>(2, string(), std::nullopt, std::nullopt, metrics);
@@ -526,7 +541,6 @@ void MetricsTestBase::MetricsForNaNColumns() {
ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
EXPECT_EQ(*metrics.row_count, 2);
- // TODO(WZhuo) Assert NaN metrics
AssertCounts(1, 2, 0, metrics);
AssertCounts(2, 2, 0, metrics);
@@ -565,7 +579,6 @@ void MetricsTestBase::ColumnBoundsWithNaNValueAtFront() {
ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
EXPECT_EQ(*metrics.row_count, 3);
- // TODO(WZhuo) Assert NaN metrics
AssertCounts(1, 3, 0, metrics);
AssertCounts(2, 3, 0, metrics);
@@ -664,6 +677,8 @@ void
MetricsTestBase::MetricsForTopLevelWithMultipleRowGroup() {
if (SupportsSmallRowGroups()) {
ICEBERG_UNWRAP_OR_FAIL(auto split_count, GetSplitCount());
EXPECT_EQ(split_count, 3);
+ } else {
+ FAIL() << "This test must force multiple row groups";
}
ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
@@ -694,6 +709,8 @@ void
MetricsTestBase::MetricsForNestedStructFieldsWithMultipleRowGroup() {
if (SupportsSmallRowGroups()) {
ICEBERG_UNWRAP_OR_FAIL(auto split_count, GetSplitCount());
EXPECT_EQ(split_count, 3);
+ } else {
+ FAIL() << "This test must force multiple row groups";
}
ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set";
EXPECT_EQ(*metrics.row_count, 201);
@@ -732,7 +749,7 @@ void MetricsTestBase::NoneMetricsMode() {
EXPECT_EQ(*metrics.row_count, 1);
// In None mode, column_sizes should be empty
- EXPECT_TRUE(metrics.column_sizes.empty());
+ AssertColumnSizeFields({}, metrics);
// All counts should be null
AssertCounts(1, std::nullopt, std::nullopt, metrics);
@@ -761,7 +778,7 @@ void MetricsTestBase::CountsMetricsMode() {
EXPECT_EQ(*metrics.row_count, 1);
// In Counts mode, column_sizes should not be empty
- EXPECT_FALSE(metrics.column_sizes.empty());
+ AssertColumnSizeFields({1, 3, 5, 6, 7}, metrics);
// Counts should be present but bounds should be null
AssertCounts(1, 1, 0, metrics);
@@ -790,7 +807,7 @@ void MetricsTestBase::FullMetricsMode() {
EXPECT_EQ(*metrics.row_count, 1);
// In Full mode, column_sizes should not be empty
- EXPECT_FALSE(metrics.column_sizes.empty());
+ AssertColumnSizeFields({1, 3, 5, 6, 7}, metrics);
// Both counts and bounds should be present
AssertCounts(1, 1, 0, metrics);
diff --git a/src/iceberg/test/metrics_test_base.h
b/src/iceberg/test/metrics_test_base.h
index 1ad9c882..07b3b62f 100644
--- a/src/iceberg/test/metrics_test_base.h
+++ b/src/iceberg/test/metrics_test_base.h
@@ -22,6 +22,7 @@
#include <memory>
#include <optional>
#include <string>
+#include <vector>
#include <arrow/array.h>
@@ -67,6 +68,9 @@ class MetricsTestBase {
std::optional<int64_t> expected_null_count,
std::optional<int64_t> expected_nan_count, const Metrics&
metrics);
+ void AssertColumnSizeFields(std::vector<int32_t> expected_field_ids,
+ const Metrics& metrics);
+
template <typename T>
void AssertBounds(int field_id, std::shared_ptr<PrimitiveType> type,
std::optional<T> expected_lower, std::optional<T>
expected_upper,
diff --git a/src/iceberg/test/parquet_metrics_test.cc
b/src/iceberg/test/parquet_metrics_test.cc
index 93c024b0..8286efe0 100644
--- a/src/iceberg/test/parquet_metrics_test.cc
+++ b/src/iceberg/test/parquet_metrics_test.cc
@@ -17,6 +17,7 @@
* under the License.
*/
+#include <arrow/builder.h>
#include <arrow/c/bridge.h>
#include <arrow/filesystem/filesystem.h>
#include <gtest/gtest.h>
@@ -27,7 +28,9 @@
#include "iceberg/arrow/arrow_io_internal.h"
#include "iceberg/arrow/arrow_status_internal.h"
#include "iceberg/file_writer.h"
+#include "iceberg/parquet/parquet_metrics_internal.h"
#include "iceberg/parquet/parquet_register.h"
+#include "iceberg/test/matchers.h"
#include "iceberg/test/metrics_test_base.h"
#include "iceberg/util/checked_cast.h"
@@ -40,8 +43,10 @@ class ParquetMetricsTest : public MetricsTestBase, public
::testing::Test {
void SetUp() override {
MetricsTestBase::SetUp();
temp_parquet_file_ = "parquet_metrics_test.parquet";
- writer_properties_ = WriterProperties::FromMap(
- {{WriterProperties::kParquetCompression.key(), "uncompressed"}});
+ writer_properties_ = WriterProperties::FromMap({
+ {WriterProperties::kParquetCompression.key(), "uncompressed"},
+ {WriterProperties::kParquetMaxRowGroupRows.key(), "100"},
+ });
}
Result<Metrics> GetMetrics(std::shared_ptr<Schema> schema,
@@ -76,7 +81,31 @@ class ParquetMetricsTest : public MetricsTestBase, public
::testing::Test {
return metadata->num_row_groups();
}
- bool SupportsSmallRowGroups() const override { return false; }
+ Result<Metrics> GetMetricsWithFieldMetrics(
+ std::shared_ptr<Schema> schema, std::shared_ptr<MetricsConfig> config,
+ std::shared_ptr<::arrow::Array> records,
+ const std::unordered_map<int32_t, FieldMetrics>& field_metrics) {
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto writer, WriterFactoryRegistry::Open(FileFormatType::kParquet,
+ {.path = temp_parquet_file_,
+ .schema = schema,
+ .io = file_io_,
+ .metadata = {},
+ .metrics_config = config,
+ .properties =
writer_properties_}));
+ ArrowArray arr;
+ ICEBERG_ARROW_RETURN_NOT_OK(::arrow::ExportArray(*records, &arr));
+ ICEBERG_RETURN_UNEXPECTED(writer->Write(&arr));
+ ICEBERG_RETURN_UNEXPECTED(writer->Close());
+
+ auto io = internal::checked_cast<arrow::ArrowFileSystemFileIO&>(*file_io_);
+ auto infile = io.fs()->OpenInputFile(temp_parquet_file_).ValueOrDie();
+ auto metadata = ::parquet::ReadMetaData(infile);
+ return parquet::ParquetMetrics::GetMetrics(*schema, *metadata->schema(),
*config,
+ *metadata, field_metrics);
+ }
+
+ bool SupportsSmallRowGroups() const override { return true; }
private:
std::string temp_parquet_file_;
@@ -85,4 +114,66 @@ class ParquetMetricsTest : public MetricsTestBase, public
::testing::Test {
DEFINE_METRICS_TESTS(ParquetMetricsTest);
+TEST_F(ParquetMetricsTest, FieldMetricsOverrideFooterStats) {
+ auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
+ SchemaField::MakeOptional(1, "floatCol", float32()),
+ SchemaField::MakeOptional(2, "strCol", string()),
+ });
+ auto arrow_schema = ::arrow::schema({
+ ::arrow::field("floatCol", ::arrow::float32(), true),
+ ::arrow::field("strCol", ::arrow::utf8(), true),
+ });
+ auto records = CreateRecordArrays(arrow_schema, R"([
+ {"floatCol": 1.0, "strCol": "footer-value"}
+ ])");
+ std::unordered_map<std::string, std::string> properties = {
+ {"write.metadata.metrics.default", "truncate(3)"}};
+ ICEBERG_UNWRAP_OR_FAIL(auto config, MetricsConfig::Make(properties));
+
+ std::unordered_map<int32_t, FieldMetrics> field_metrics{
+ {1, FieldMetrics{.field_id = 1,
+ .value_count = 10,
+ .null_value_count = 2,
+ .lower_bound = Literal::Float(5.0F),
+ .upper_bound = Literal::Float(9.0F)}},
+ {2, FieldMetrics{.field_id = 2,
+ .value_count = 10,
+ .null_value_count = 1,
+ .lower_bound = Literal::String("abcdef"),
+ .upper_bound = Literal::String("abcxyz")}},
+ };
+
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto metrics, GetMetricsWithFieldMetrics(schema, config, records,
field_metrics));
+
+ AssertCounts(1, 10, 2, metrics);
+ AssertBounds<float>(1, float32(), 5.0F, 9.0F, metrics);
+ AssertCounts(2, 10, 1, metrics);
+ AssertBounds<std::string>(2, string(), std::string("abc"),
std::string("abd"), metrics);
+}
+
+TEST_F(ParquetMetricsTest, UnrepresentableTruncatedUpperBoundIsOmitted) {
+ auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "binCol", binary()),
+ });
+ auto arrow_schema = ::arrow::schema({
+ ::arrow::field("binCol", ::arrow::binary(), false),
+ });
+
+ ::arrow::BinaryBuilder builder;
+ std::vector<uint8_t> data = {0xFF, 0xFF, 0x01};
+ ASSERT_TRUE(builder.Append(data.data(), data.size()).ok());
+ auto array = builder.Finish().ValueOrDie();
+ auto records = ::arrow::StructArray::Make({array},
arrow_schema->fields()).ValueOrDie();
+ std::unordered_map<std::string, std::string> properties = {
+ {"write.metadata.metrics.default", "truncate(2)"}};
+ ICEBERG_UNWRAP_OR_FAIL(auto config, MetricsConfig::Make(properties));
+
+ ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, config, records));
+
+ AssertCounts(1, 1, 0, metrics);
+ AssertBounds<std::vector<uint8_t>>(1, binary(), std::vector<uint8_t>({0xFF,
0xFF}),
+ std::nullopt, metrics);
+}
+
} // namespace iceberg::test
diff --git a/src/iceberg/util/truncate_util.cc
b/src/iceberg/util/truncate_util.cc
index 8a1d0724..1778000f 100644
--- a/src/iceberg/util/truncate_util.cc
+++ b/src/iceberg/util/truncate_util.cc
@@ -179,7 +179,7 @@ Result<std::optional<Literal>>
TruncateLiteralMaxImpl<TypeId::kString>(
if (!truncated.has_value()) {
return std::nullopt;
}
- return std::optional<Literal>(Literal::String(std::move(truncated.value())));
+ return Literal::String(std::move(*truncated));
}
template <>
@@ -187,7 +187,7 @@ Result<std::optional<Literal>>
TruncateLiteralMaxImpl<TypeId::kBinary>(
const Literal& literal, int32_t width) {
const auto& data = std::get<std::vector<uint8_t>>(literal.value());
if (static_cast<int32_t>(data.size()) <= width) {
- return std::optional<Literal>(literal);
+ return literal;
}
std::vector<uint8_t> truncated(data.begin(), data.begin() + width);
@@ -195,7 +195,7 @@ Result<std::optional<Literal>>
TruncateLiteralMaxImpl<TypeId::kBinary>(
if (*it < 0xFF) {
++(*it);
truncated.resize(truncated.size() - std::distance(truncated.rbegin(),
it));
- return std::optional<Literal>(Literal::Binary(std::move(truncated)));
+ return Literal::Binary(std::move(truncated));
}
}
return std::nullopt;
@@ -207,7 +207,7 @@ Result<std::optional<std::string>>
TruncateUtils::TruncateUTF8Max(
const std::string& source, size_t L) {
std::string truncated = TruncateUTF8(source, L);
if (truncated == source) {
- return std::optional<std::string>(std::move(truncated));
+ return truncated;
}
// Try incrementing code points from the end
@@ -236,7 +236,7 @@ Result<std::optional<std::string>>
TruncateUtils::TruncateUTF8Max(
if (next_code_point <= kUtf8MaxCodePoint) {
truncated.resize(cp_start);
AppendUtf8CodePoint(next_code_point, truncated);
- return std::optional<std::string>(std::move(truncated));
+ return truncated;
}
}
last_cp_start = cp_start;
@@ -282,7 +282,7 @@ Result<std::optional<Literal>>
TruncateUtils::TruncateLiteralMax(const Literal&
int32_t
width) {
if (literal.IsNull()) [[unlikely]] {
// Return null as is
- return std::optional<Literal>(literal);
+ return literal;
}
if (literal.IsAboveMax() || literal.IsBelowMin()) [[unlikely]] {
@@ -316,7 +316,7 @@ Result<std::optional<Literal>>
TruncateUtils::TruncateUpperBound(
case TypeId::kBinary:
return TruncateLiteralMax(value, length);
default:
- return std::optional<Literal>(value);
+ return value;
}
}