This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new f43d24b2 feat: enhance ManifestReader with projection and filtering
support (#431)
f43d24b2 is described below
commit f43d24b2bbcc981ed074a9689da5b6571fa131e2
Author: Gang Wu <[email protected]>
AuthorDate: Thu Dec 25 09:40:48 2025 +0800
feat: enhance ManifestReader with projection and filtering support (#431)
---
src/iceberg/CMakeLists.txt | 1 -
src/iceberg/manifest/manifest_entry.h | 154 ++--
src/iceberg/manifest/manifest_list.cc | 14 +-
src/iceberg/manifest/manifest_list.h | 85 ++-
src/iceberg/manifest/manifest_reader.cc | 876 +++++++++++++++++++++-
src/iceberg/manifest/manifest_reader.h | 53 +-
src/iceberg/manifest/manifest_reader_internal.cc | 606 ---------------
src/iceberg/manifest/manifest_reader_internal.h | 96 ++-
src/iceberg/meson.build | 1 -
src/iceberg/table_scan.cc | 6 +-
src/iceberg/test/CMakeLists.txt | 2 +
src/iceberg/test/manifest_reader_stats_test.cc | 239 ++++++
src/iceberg/test/manifest_reader_test.cc | 381 ++++++++++
src/iceberg/test/manifest_writer_versions_test.cc | 5 +-
14 files changed, 1755 insertions(+), 764 deletions(-)
diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index dcc9372f..e48afbfd 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -42,7 +42,6 @@ set(ICEBERG_SOURCES
manifest/manifest_entry.cc
manifest/manifest_list.cc
manifest/manifest_reader.cc
- manifest/manifest_reader_internal.cc
manifest/manifest_writer.cc
manifest/v1_metadata.cc
manifest/v2_metadata.cc
diff --git a/src/iceberg/manifest/manifest_entry.h
b/src/iceberg/manifest/manifest_entry.h
index 8e3183a4..f82b7a2b 100644
--- a/src/iceberg/manifest/manifest_entry.h
+++ b/src/iceberg/manifest/manifest_entry.h
@@ -178,94 +178,114 @@ struct ICEBERG_EXPORT DataFile {
/// present
std::optional<int64_t> content_size_in_bytes;
+ static constexpr int32_t kContentFieldId = 134;
inline static const SchemaField kContent = SchemaField::MakeOptional(
- 134, "content", iceberg::int32(),
+ kContentFieldId, "content", int32(),
"Contents of the file: 0=data, 1=position deletes, 2=equality deletes");
+
+ static constexpr int32_t kFilePathFieldId = 100;
inline static const SchemaField kFilePath = SchemaField::MakeRequired(
- 100, "file_path", iceberg::string(), "Location URI with FS scheme");
- inline static const SchemaField kFileFormat = SchemaField::MakeRequired(
- 101, "file_format", iceberg::string(), "File format name: avro, orc, or
parquet");
- inline static const int32_t kPartitionFieldId = 102;
+ kFilePathFieldId, "file_path", string(), "Location URI with FS scheme");
+
+ static constexpr int32_t kFileFormatFieldId = 101;
+ inline static const SchemaField kFileFormat =
+ SchemaField::MakeRequired(kFileFormatFieldId, "file_format", string(),
+ "File format name: avro, orc, or parquet");
+
+ static constexpr int32_t kPartitionFieldId = 102;
inline static const std::string kPartitionField = "partition";
inline static const std::string kPartitionDoc =
"Partition data tuple, schema based on the partition spec";
+
+ static constexpr int32_t kRecordCountFieldId = 103;
inline static const SchemaField kRecordCount = SchemaField::MakeRequired(
- 103, "record_count", iceberg::int64(), "Number of records in the file");
+ kRecordCountFieldId, "record_count", int64(), "Number of records in the
file");
+
+ static constexpr int32_t kFileSizeFieldId = 104;
inline static const SchemaField kFileSize = SchemaField::MakeRequired(
- 104, "file_size_in_bytes", iceberg::int64(), "Total file size in bytes");
+ kFileSizeFieldId, "file_size_in_bytes", int64(), "Total file size in
bytes");
+
+ static constexpr int32_t kColumnSizesFieldId = 108;
inline static const SchemaField kColumnSizes = SchemaField::MakeOptional(
- 108, "column_sizes",
- std::make_shared<MapType>(
- SchemaField::MakeRequired(117, std::string(MapType::kKeyName),
- iceberg::int32()),
- SchemaField::MakeRequired(118, std::string(MapType::kValueName),
- iceberg::int64())),
+ kColumnSizesFieldId, "column_sizes",
+ map(SchemaField::MakeRequired(117, std::string(MapType::kKeyName),
int32()),
+ SchemaField::MakeRequired(118, std::string(MapType::kValueName),
int64())),
"Map of column id to total size on disk");
+
+ static constexpr int32_t kValueCountsFieldId = 109;
inline static const SchemaField kValueCounts = SchemaField::MakeOptional(
- 109, "value_counts",
- std::make_shared<MapType>(
- SchemaField::MakeRequired(119, std::string(MapType::kKeyName),
- iceberg::int32()),
- SchemaField::MakeRequired(120, std::string(MapType::kValueName),
- iceberg::int64())),
+ kValueCountsFieldId, "value_counts",
+ map(SchemaField::MakeRequired(119, std::string(MapType::kKeyName),
int32()),
+ SchemaField::MakeRequired(120, std::string(MapType::kValueName),
int64())),
"Map of column id to total count, including null and NaN");
+
+ static constexpr int32_t kNullValueCountsFieldId = 110;
inline static const SchemaField kNullValueCounts = SchemaField::MakeOptional(
- 110, "null_value_counts",
- std::make_shared<MapType>(
- SchemaField::MakeRequired(121, std::string(MapType::kKeyName),
- iceberg::int32()),
- SchemaField::MakeRequired(122, std::string(MapType::kValueName),
- iceberg::int64())),
+ kNullValueCountsFieldId, "null_value_counts",
+ map(SchemaField::MakeRequired(121, std::string(MapType::kKeyName),
int32()),
+ SchemaField::MakeRequired(122, std::string(MapType::kValueName),
int64())),
"Map of column id to null value count");
+
+ static constexpr int32_t kNanValueCountsFieldId = 137;
inline static const SchemaField kNanValueCounts = SchemaField::MakeOptional(
- 137, "nan_value_counts",
- std::make_shared<MapType>(
- SchemaField::MakeRequired(138, std::string(MapType::kKeyName),
- iceberg::int32()),
- SchemaField::MakeRequired(139, std::string(MapType::kValueName),
- iceberg::int64())),
+ kNanValueCountsFieldId, "nan_value_counts",
+ map(SchemaField::MakeRequired(138, std::string(MapType::kKeyName),
int32()),
+ SchemaField::MakeRequired(139, std::string(MapType::kValueName),
int64())),
"Map of column id to number of NaN values in the column");
+
+ static constexpr int32_t kLowerBoundsFieldId = 125;
inline static const SchemaField kLowerBounds = SchemaField::MakeOptional(
- 125, "lower_bounds",
- std::make_shared<MapType>(
- SchemaField::MakeRequired(126, std::string(MapType::kKeyName),
- iceberg::int32()),
- SchemaField::MakeRequired(127, std::string(MapType::kValueName),
- iceberg::binary())),
+ kLowerBoundsFieldId, "lower_bounds",
+ map(SchemaField::MakeRequired(126, std::string(MapType::kKeyName),
int32()),
+ SchemaField::MakeRequired(127, std::string(MapType::kValueName),
binary())),
"Map of column id to lower bound");
+
+ static constexpr int32_t kUpperBoundsFieldId = 128;
inline static const SchemaField kUpperBounds = SchemaField::MakeOptional(
- 128, "upper_bounds",
- std::make_shared<MapType>(
- SchemaField::MakeRequired(129, std::string(MapType::kKeyName),
- iceberg::int32()),
- SchemaField::MakeRequired(130, std::string(MapType::kValueName),
- iceberg::binary())),
+ kUpperBoundsFieldId, "upper_bounds",
+ map(SchemaField::MakeRequired(129, std::string(MapType::kKeyName),
int32()),
+ SchemaField::MakeRequired(130, std::string(MapType::kValueName),
binary())),
"Map of column id to upper bound");
+
+ static constexpr int32_t kKeyMetadataFieldId = 131;
inline static const SchemaField kKeyMetadata = SchemaField::MakeOptional(
- 131, "key_metadata", iceberg::binary(), "Encryption key metadata blob");
+ kKeyMetadataFieldId, "key_metadata", binary(), "Encryption key metadata
blob");
+
+ static constexpr int32_t kSplitOffsetsFieldId = 132;
inline static const SchemaField kSplitOffsets = SchemaField::MakeOptional(
- 132, "split_offsets",
- std::make_shared<ListType>(SchemaField::MakeRequired(
- 133, std::string(ListType::kElementName), iceberg::int64())),
+ kSplitOffsetsFieldId, "split_offsets",
+ list(SchemaField::MakeRequired(133, std::string(ListType::kElementName),
int64())),
"Splittable offsets");
+
+ static constexpr int32_t kEqualityIdsFieldId = 135;
inline static const SchemaField kEqualityIds = SchemaField::MakeOptional(
- 135, "equality_ids",
- std::make_shared<ListType>(SchemaField::MakeRequired(
- 136, std::string(ListType::kElementName), iceberg::int32())),
+ kEqualityIdsFieldId, "equality_ids",
+ list(SchemaField::MakeRequired(136, std::string(ListType::kElementName),
int32())),
"Equality comparison field IDs");
- inline static const SchemaField kSortOrderId =
- SchemaField::MakeOptional(140, "sort_order_id", iceberg::int32(), "Sort
order ID");
- inline static const SchemaField kFirstRowId = SchemaField::MakeOptional(
- 142, "first_row_id", iceberg::int64(), "Starting row ID to assign to new
rows");
+
+ static constexpr int32_t kSortOrderIdFieldId = 140;
+ inline static const SchemaField kSortOrderId = SchemaField::MakeOptional(
+ kSortOrderIdFieldId, "sort_order_id", int32(), "Sort order ID");
+
+ static constexpr int32_t kFirstRowIdFieldId = 142;
+ inline static const SchemaField kFirstRowId =
+ SchemaField::MakeOptional(kFirstRowIdFieldId, "first_row_id", int64(),
+ "Starting row ID to assign to new rows");
+
+ static constexpr int32_t kReferencedDataFileFieldId = 143;
inline static const SchemaField kReferencedDataFile =
SchemaField::MakeOptional(
- 143, "referenced_data_file", iceberg::string(),
+ kReferencedDataFileFieldId, "referenced_data_file", string(),
"Fully qualified location (URI with FS scheme) of a data file that all
deletes "
"reference");
+
+ static constexpr int32_t kContentOffsetFieldId = 144;
inline static const SchemaField kContentOffset =
- SchemaField::MakeOptional(144, "content_offset", iceberg::int64(),
+ SchemaField::MakeOptional(kContentOffsetFieldId, "content_offset",
int64(),
"The offset in the file where the content
starts");
+
+ static constexpr int32_t kContentSizeFieldId = 145;
inline static const SchemaField kContentSize =
- SchemaField::MakeOptional(145, "content_size_in_bytes", iceberg::int64(),
+ SchemaField::MakeOptional(kContentSizeFieldId, "content_size_in_bytes",
int64(),
"The length of referenced content stored in
the file");
bool operator==(const DataFile& other) const = default;
@@ -298,16 +318,24 @@ struct ICEBERG_EXPORT ManifestEntry {
/// File path, partition tuple, metrics, ...
std::shared_ptr<DataFile> data_file;
+ static constexpr int32_t kStatusFieldId = 0;
inline static const SchemaField kStatus =
- SchemaField::MakeRequired(0, "status", iceberg::int32());
+ SchemaField::MakeRequired(kStatusFieldId, "status", int32());
+
+ static constexpr int32_t kSnapshotIdFieldId = 1;
inline static const SchemaField kSnapshotId =
- SchemaField::MakeOptional(1, "snapshot_id", iceberg::int64());
- inline static const int32_t kDataFileFieldId = 2;
+ SchemaField::MakeOptional(kSnapshotIdFieldId, "snapshot_id", int64());
+
+ static constexpr int32_t kDataFileFieldId = 2;
inline static const std::string kDataFileField = "data_file";
+
+ static constexpr int32_t kSequenceNumberFieldId = 3;
inline static const SchemaField kSequenceNumber =
- SchemaField::MakeOptional(3, "sequence_number", iceberg::int64());
- inline static const SchemaField kFileSequenceNumber =
- SchemaField::MakeOptional(4, "file_sequence_number", iceberg::int64());
+ SchemaField::MakeOptional(kSequenceNumberFieldId, "sequence_number",
int64());
+
+ static constexpr int32_t kFileSequenceNumberFieldId = 4;
+ inline static const SchemaField kFileSequenceNumber =
SchemaField::MakeOptional(
+ kFileSequenceNumberFieldId, "file_sequence_number", int64());
/// \brief Check if this manifest entry is deleted.
constexpr bool IsAlive() const {
diff --git a/src/iceberg/manifest/manifest_list.cc
b/src/iceberg/manifest/manifest_list.cc
index 4351081c..9719b831 100644
--- a/src/iceberg/manifest/manifest_list.cc
+++ b/src/iceberg/manifest/manifest_list.cc
@@ -19,22 +19,24 @@
#include "iceberg/manifest/manifest_list.h"
-#include "iceberg/schema.h"
+#include <memory>
+
+#include "iceberg/type.h"
namespace iceberg {
-const StructType& PartitionFieldSummary::Type() {
- static const StructType kInstance{{
+const std::shared_ptr<StructType>& PartitionFieldSummary::Type() {
+ static const auto kInstance =
std::make_shared<StructType>(std::vector<SchemaField>{
PartitionFieldSummary::kContainsNull,
PartitionFieldSummary::kContainsNaN,
PartitionFieldSummary::kLowerBound,
PartitionFieldSummary::kUpperBound,
- }};
+ });
return kInstance;
}
-const std::shared_ptr<Schema>& ManifestFile::Type() {
- static const auto kInstance =
std::make_shared<Schema>(std::vector<SchemaField>{
+const std::shared_ptr<StructType>& ManifestFile::Type() {
+ static const auto kInstance =
std::make_shared<StructType>(std::vector<SchemaField>{
kManifestPath,
kManifestLength,
kPartitionSpecId,
diff --git a/src/iceberg/manifest/manifest_list.h
b/src/iceberg/manifest/manifest_list.h
index 85c0b89e..47a7ad48 100644
--- a/src/iceberg/manifest/manifest_list.h
+++ b/src/iceberg/manifest/manifest_list.h
@@ -69,7 +69,7 @@ struct ICEBERG_EXPORT PartitionFieldSummary {
bool operator==(const PartitionFieldSummary& other) const = default;
- static const StructType& Type();
+ static const std::shared_ptr<StructType>& Type();
};
/// \brief The type of files tracked by the manifest, either data or delete
files; 0 for
@@ -153,51 +153,86 @@ struct ICEBERG_EXPORT ManifestFile {
/// \brief Checks if this manifest file contains entries with DELETED status
bool has_deleted_files() const { return deleted_files_count.value_or(1) > 0;
}
+ static constexpr int32_t kManifestPathFieldId = 500;
inline static const SchemaField kManifestPath = SchemaField::MakeRequired(
- 500, "manifest_path", iceberg::string(), "Location URI with FS scheme");
+ kManifestPathFieldId, "manifest_path", string(), "Location URI with FS
scheme");
+
+ static constexpr int32_t kManifestLengthFieldId = 501;
inline static const SchemaField kManifestLength = SchemaField::MakeRequired(
- 501, "manifest_length", iceberg::int64(), "Total file size in bytes");
+ kManifestLengthFieldId, "manifest_length", int64(), "Total file size in
bytes");
+
+ static constexpr int32_t kPartitionSpecIdFieldId = 502;
inline static const SchemaField kPartitionSpecId = SchemaField::MakeRequired(
- 502, "partition_spec_id", iceberg::int32(), "Spec ID used to write");
+ kPartitionSpecIdFieldId, "partition_spec_id", int32(), "Spec ID used to
write");
+
+ static constexpr int32_t kContentFieldId = 517;
inline static const SchemaField kContent = SchemaField::MakeOptional(
- 517, "content", iceberg::int32(), "Contents of the manifest: 0=data,
1=deletes");
+ kContentFieldId, "content", int32(), "Contents of the manifest: 0=data,
1=deletes");
+
+ static constexpr int32_t kSequenceNumberFieldId = 515;
inline static const SchemaField kSequenceNumber =
- SchemaField::MakeOptional(515, "sequence_number", iceberg::int64(),
+ SchemaField::MakeOptional(kSequenceNumberFieldId, "sequence_number",
int64(),
"Sequence number when the manifest was added");
+
+ static constexpr int32_t kMinSequenceNumberFieldId = 516;
inline static const SchemaField kMinSequenceNumber =
- SchemaField::MakeOptional(516, "min_sequence_number", iceberg::int64(),
+ SchemaField::MakeOptional(kMinSequenceNumberFieldId,
"min_sequence_number", int64(),
"Lowest sequence number in the manifest");
- inline static const SchemaField kAddedSnapshotId = SchemaField::MakeRequired(
- 503, "added_snapshot_id", iceberg::int64(), "Snapshot ID that added the
manifest");
+
+ static constexpr int32_t kAddedSnapshotIdFieldId = 503;
+ inline static const SchemaField kAddedSnapshotId =
+ SchemaField::MakeRequired(kAddedSnapshotIdFieldId, "added_snapshot_id",
int64(),
+ "Snapshot ID that added the manifest");
+
+ static constexpr int32_t kAddedFilesCountFieldId = 504;
inline static const SchemaField kAddedFilesCount = SchemaField::MakeOptional(
- 504, "added_files_count", iceberg::int32(), "Added entry count");
- inline static const SchemaField kExistingFilesCount =
SchemaField::MakeOptional(
- 505, "existing_files_count", iceberg::int32(), "Existing entry count");
+ kAddedFilesCountFieldId, "added_files_count", int32(), "Added entry
count");
+
+ static constexpr int32_t kExistingFilesCountFieldId = 505;
+ inline static const SchemaField kExistingFilesCount =
+ SchemaField::MakeOptional(kExistingFilesCountFieldId,
"existing_files_count",
+ int32(), "Existing entry count");
+
+ static constexpr int32_t kDeletedFilesCountFieldId = 506;
inline static const SchemaField kDeletedFilesCount =
SchemaField::MakeOptional(
- 506, "deleted_files_count", iceberg::int32(), "Deleted entry count");
+ kDeletedFilesCountFieldId, "deleted_files_count", int32(), "Deleted
entry count");
+
+ static constexpr int32_t kAddedRowsCountFieldId = 512;
inline static const SchemaField kAddedRowsCount = SchemaField::MakeOptional(
- 512, "added_rows_count", iceberg::int64(), "Added rows count");
+ kAddedRowsCountFieldId, "added_rows_count", int64(), "Added rows count");
+
+ static constexpr int32_t kExistingRowsCountFieldId = 513;
inline static const SchemaField kExistingRowsCount =
SchemaField::MakeOptional(
- 513, "existing_rows_count", iceberg::int64(), "Existing rows count");
+ kExistingRowsCountFieldId, "existing_rows_count", int64(), "Existing
rows count");
+
+ static constexpr int32_t kDeletedRowsCountFieldId = 514;
inline static const SchemaField kDeletedRowsCount =
SchemaField::MakeOptional(
- 514, "deleted_rows_count", iceberg::int64(), "Deleted rows count");
+ kDeletedRowsCountFieldId, "deleted_rows_count", int64(), "Deleted rows
count");
+
+ static constexpr int32_t kPartitionSummaryFieldId = 507;
inline static const SchemaField kPartitions = SchemaField::MakeOptional(
- 507, "partitions",
- std::make_shared<ListType>(SchemaField::MakeRequired(
- 508, std::string(ListType::kElementName),
- struct_(
- {PartitionFieldSummary::kContainsNull,
PartitionFieldSummary::kContainsNaN,
- PartitionFieldSummary::kLowerBound,
PartitionFieldSummary::kUpperBound}))),
+ kPartitionSummaryFieldId, "partitions",
+ list(SchemaField::MakeRequired(508, std::string(ListType::kElementName),
+ struct_({
+ PartitionFieldSummary::kContainsNull,
+ PartitionFieldSummary::kContainsNaN,
+ PartitionFieldSummary::kLowerBound,
+ PartitionFieldSummary::kUpperBound,
+ }))),
"Summary for each partition");
+
+ static constexpr int32_t kKeyMetadataFieldId = 519;
inline static const SchemaField kKeyMetadata = SchemaField::MakeOptional(
- 519, "key_metadata", iceberg::binary(), "Encryption key metadata blob");
+ kKeyMetadataFieldId, "key_metadata", binary(), "Encryption key metadata
blob");
+
+ static constexpr int32_t kFirstRowIdFieldId = 520;
inline static const SchemaField kFirstRowId = SchemaField::MakeOptional(
- 520, "first_row_id", iceberg::int64(),
+ kFirstRowIdFieldId, "first_row_id", int64(),
"Starting row ID to assign to new rows in ADDED data files");
bool operator==(const ManifestFile& other) const = default;
- static const std::shared_ptr<Schema>& Type();
+ static const std::shared_ptr<StructType>& Type();
};
/// Snapshots are embedded in table metadata, but the list of manifests for a
snapshot are
diff --git a/src/iceberg/manifest/manifest_reader.cc
b/src/iceberg/manifest/manifest_reader.cc
index 1750151e..2ba377fa 100644
--- a/src/iceberg/manifest/manifest_reader.cc
+++ b/src/iceberg/manifest/manifest_reader.cc
@@ -19,61 +19,873 @@
#include "iceberg/manifest/manifest_reader.h"
+#include <algorithm>
+#include <memory>
+#include <unordered_set>
+
+#include <nanoarrow/nanoarrow.h>
+
+#include "iceberg/arrow/nanoarrow_status_internal.h"
+#include "iceberg/arrow_c_data_guard_internal.h"
+#include "iceberg/expression/expression.h"
+#include "iceberg/expression/projections.h"
+#include "iceberg/file_format.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/manifest/manifest_list.h"
#include "iceberg/manifest/manifest_reader_internal.h"
+#include "iceberg/metadata_columns.h"
+#include "iceberg/partition_spec.h"
#include "iceberg/schema.h"
-#include "iceberg/schema_internal.h"
+#include "iceberg/schema_field.h"
#include "iceberg/type.h"
+#include "iceberg/util/checked_cast.h"
#include "iceberg/util/macros.h"
namespace iceberg {
+namespace {
+
+// TODO(gangwu): refactor these macros with template functions.
+#define PARSE_PRIMITIVE_FIELD(item, array_view, type)
\
+ for (int64_t row_idx = 0; row_idx < array_view->length; row_idx++) {
\
+ if (!ArrowArrayViewIsNull(array_view, row_idx)) {
\
+ auto value = ArrowArrayViewGetIntUnsafe(array_view, row_idx);
\
+ item = static_cast<type>(value);
\
+ } else if (required) {
\
+ return InvalidManifestList("Field {} is required but null at row {}",
field_name, \
+ row_idx);
\
+ }
\
+ }
+
+#define PARSE_STRING_FIELD(item, array_view)
\
+ for (int64_t row_idx = 0; row_idx < array_view->length; row_idx++) {
\
+ if (!ArrowArrayViewIsNull(array_view, row_idx)) {
\
+ auto value = ArrowArrayViewGetStringUnsafe(array_view, row_idx);
\
+ item = std::string(value.data, value.size_bytes);
\
+ } else if (required) {
\
+ return InvalidManifestList("Field {} is required but null at row {}",
field_name, \
+ row_idx);
\
+ }
\
+ }
+
+#define PARSE_BINARY_FIELD(item, array_view)
\
+ for (int64_t row_idx = 0; row_idx < array_view->length; row_idx++) {
\
+ if (!ArrowArrayViewIsNull(view_of_column, row_idx)) {
\
+ item = ArrowArrayViewGetInt8Vector(array_view, row_idx);
\
+ } else if (required) {
\
+ return InvalidManifestList("Field {} is required but null at row {}",
field_name, \
+ row_idx);
\
+ }
\
+ }
+
+#define PARSE_INTEGER_VECTOR_FIELD(item, count, array_view, type)
\
+ for (int64_t manifest_idx = 0; manifest_idx < count; manifest_idx++) {
\
+ auto offset = ArrowArrayViewListChildOffset(array_view, manifest_idx);
\
+ auto next_offset = ArrowArrayViewListChildOffset(array_view, manifest_idx
+ 1); \
+ for (int64_t offset_idx = offset; offset_idx < next_offset; offset_idx++)
{ \
+ item.emplace_back(static_cast<type>(
\
+ ArrowArrayViewGetIntUnsafe(array_view->children[0], offset_idx)));
\
+ }
\
+ }
+
+#define PARSE_MAP_FIELD(item, count, array_view, key_type, value_type,
assignment) \
+ do {
\
+ if (array_view->storage_type != ArrowType::NANOARROW_TYPE_MAP) {
\
+ return InvalidManifest("Field:{} should be a map.", field_name);
\
+ }
\
+ auto view_of_map = array_view->children[0];
\
+ ASSERT_VIEW_TYPE_AND_CHILDREN(view_of_map,
ArrowType::NANOARROW_TYPE_STRUCT, 2); \
+ auto view_of_map_key = view_of_map->children[0];
\
+ ASSERT_VIEW_TYPE(view_of_map_key, key_type);
\
+ auto view_of_map_value = view_of_map->children[1];
\
+ ASSERT_VIEW_TYPE(view_of_map_value, value_type);
\
+ for (int64_t row_idx = 0; row_idx < count; row_idx++) {
\
+ auto offset = array_view->buffer_views[1].data.as_int32[row_idx];
\
+ auto next_offset = array_view->buffer_views[1].data.as_int32[row_idx +
1]; \
+ for (int32_t offset_idx = offset; offset_idx < next_offset;
offset_idx++) { \
+ auto key = ArrowArrayViewGetIntUnsafe(view_of_map_key, offset_idx);
\
+ item[key] = assignment;
\
+ }
\
+ }
\
+ } while (0)
+
+#define PARSE_INT_LONG_MAP_FIELD(item, count, array_view) \
+ PARSE_MAP_FIELD(item, count, array_view, ArrowType::NANOARROW_TYPE_INT32, \
+ ArrowType::NANOARROW_TYPE_INT64, \
+ ArrowArrayViewGetIntUnsafe(view_of_map_value, offset_idx));
+
+#define PARSE_INT_BINARY_MAP_FIELD(item, count, array_view) \
+ PARSE_MAP_FIELD(item, count, array_view, ArrowType::NANOARROW_TYPE_INT32, \
+ ArrowType::NANOARROW_TYPE_BINARY, \
+ ArrowArrayViewGetInt8Vector(view_of_map_value, offset_idx));
+
+#define ASSERT_VIEW_TYPE(view, type)
\
+ if (view->storage_type != type) {
\
+ return InvalidManifest("Sub Field:{} should be a {}.", field_name, #type);
\
+ }
+
+#define ASSERT_VIEW_TYPE_AND_CHILDREN(view, type, n_child)
\
+ if (view->storage_type != type) {
\
+ return InvalidManifest("Sub Field:{} should be a {}.", field_name, #type);
\
+ }
\
+ if (view->n_children != n_child) {
\
+ return InvalidManifest("Sub Field for:{} should have key&value:{}
columns.", \
+ field_name, n_child);
\
+ }
+
+std::vector<uint8_t> ArrowArrayViewGetInt8Vector(const ArrowArrayView* view,
+ int32_t offset_idx) {
+ auto buffer = ArrowArrayViewGetBytesUnsafe(view, offset_idx);
+ return {buffer.data.as_char, buffer.data.as_char + buffer.size_bytes};
+}
+
+Status ParsePartitionFieldSummaryList(ArrowArrayView* view_of_column,
+ std::vector<ManifestFile>&
manifest_files) {
+ auto manifest_count = view_of_column->length;
+ // view_of_column is list<struct<PartitionFieldSummary>>
+ if (view_of_column->storage_type != ArrowType::NANOARROW_TYPE_LIST) {
+ return InvalidManifestList("partitions field should be a list.");
+ }
+ auto view_of_list_item = view_of_column->children[0];
+ // view_of_list_item is struct<PartitionFieldSummary>
+ if (view_of_list_item->storage_type != ArrowType::NANOARROW_TYPE_STRUCT) {
+ return InvalidManifestList("partitions list item should be a struct.");
+ }
+ if (view_of_list_item->n_children != 4) {
+ return InvalidManifestList("PartitionFieldSummary should have 4 fields.");
+ }
+ if (view_of_list_item->children[0]->storage_type !=
ArrowType::NANOARROW_TYPE_BOOL) {
+ return InvalidManifestList("contains_null should have be bool type
column.");
+ }
+ auto contains_null = view_of_list_item->children[0];
+ if (view_of_list_item->children[1]->storage_type !=
ArrowType::NANOARROW_TYPE_BOOL) {
+ return InvalidManifestList("contains_nan should have be bool type
column.");
+ }
+ auto contains_nan = view_of_list_item->children[1];
+ if (view_of_list_item->children[2]->storage_type !=
ArrowType::NANOARROW_TYPE_BINARY) {
+ return InvalidManifestList("lower_bound should have be binary type
column.");
+ }
+ auto lower_bound_list = view_of_list_item->children[2];
+ if (view_of_list_item->children[3]->storage_type !=
ArrowType::NANOARROW_TYPE_BINARY) {
+ return InvalidManifestList("upper_bound should have be binary type
column.");
+ }
+ auto upper_bound_list = view_of_list_item->children[3];
+ for (int64_t manifest_idx = 0; manifest_idx < manifest_count;
manifest_idx++) {
+ auto offset = ArrowArrayViewListChildOffset(view_of_column, manifest_idx);
+ auto next_offset = ArrowArrayViewListChildOffset(view_of_column,
manifest_idx + 1);
+ // partitions from offset to next_offset belongs to manifest_idx
+ auto& manifest_file = manifest_files[manifest_idx];
+ for (int64_t partition_idx = offset; partition_idx < next_offset;
partition_idx++) {
+ PartitionFieldSummary partition_field_summary;
+ if (!ArrowArrayViewIsNull(contains_null, partition_idx)) {
+ partition_field_summary.contains_null =
+ ArrowArrayViewGetIntUnsafe(contains_null, partition_idx);
+ } else {
+ return InvalidManifestList("contains_null is null at row {}",
partition_idx);
+ }
+ if (!ArrowArrayViewIsNull(contains_nan, partition_idx)) {
+ partition_field_summary.contains_nan =
+ ArrowArrayViewGetIntUnsafe(contains_nan, partition_idx);
+ }
+ if (!ArrowArrayViewIsNull(lower_bound_list, partition_idx)) {
+ partition_field_summary.lower_bound =
+ ArrowArrayViewGetInt8Vector(lower_bound_list, partition_idx);
+ }
+ if (!ArrowArrayViewIsNull(upper_bound_list, partition_idx)) {
+ partition_field_summary.upper_bound =
+ ArrowArrayViewGetInt8Vector(upper_bound_list, partition_idx);
+ }
+
+ manifest_file.partitions.emplace_back(partition_field_summary);
+ }
+ }
+ return {};
+}
+
+Result<std::vector<ManifestFile>> ParseManifestList(ArrowSchema* schema,
+ ArrowArray* array_in,
+ const Schema&
iceberg_schema) {
+ if (schema->n_children != array_in->n_children) {
+ return InvalidManifestList("Columns size not match between schema:{} and
array:{}",
+ schema->n_children, array_in->n_children);
+ }
+ if (iceberg_schema.fields().size() != array_in->n_children) {
+ return InvalidManifestList("Columns size not match between schema:{} and
array:{}",
+ iceberg_schema.fields().size(),
array_in->n_children);
+ }
+
+ ArrowError error;
+ ArrowArrayView array_view;
+ auto status = ArrowArrayViewInitFromSchema(&array_view, schema, &error);
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(status, error);
+ internal::ArrowArrayViewGuard view_guard(&array_view);
+ status = ArrowArrayViewSetArray(&array_view, array_in, &error);
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(status, error);
+ status = ArrowArrayViewValidate(&array_view,
NANOARROW_VALIDATION_LEVEL_FULL, &error);
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(status, error);
+
+ std::vector<ManifestFile> manifest_files;
+ manifest_files.resize(array_in->length);
+
+ for (int64_t idx = 0; idx < array_in->n_children; idx++) {
+ ICEBERG_ASSIGN_OR_RAISE(auto field, iceberg_schema.GetFieldByIndex(idx));
+ ICEBERG_CHECK(field.has_value(), "Invalid index {} for data file schema",
idx);
+ auto field_name = field->get().name();
+ auto field_id = field->get().field_id();
+ auto required = !field->get().optional();
+ auto view_of_column = array_view.children[idx];
+ switch (field_id) {
+ case ManifestFile::kManifestPathFieldId:
+ PARSE_STRING_FIELD(manifest_files[row_idx].manifest_path,
view_of_column);
+ break;
+ case ManifestFile::kManifestLengthFieldId:
+ PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].manifest_length,
view_of_column,
+ int64_t);
+ break;
+ case ManifestFile::kPartitionSpecIdFieldId:
+ PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].partition_spec_id,
view_of_column,
+ int32_t);
+ break;
+ case ManifestFile::kContentFieldId:
+ PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].content, view_of_column,
+ ManifestContent);
+ break;
+ case ManifestFile::kSequenceNumberFieldId:
+ PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].sequence_number,
view_of_column,
+ int64_t);
+ break;
+ case ManifestFile::kMinSequenceNumberFieldId:
+ PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].min_sequence_number,
view_of_column,
+ int64_t);
+ break;
+ case ManifestFile::kAddedSnapshotIdFieldId:
+ PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_snapshot_id,
view_of_column,
+ int64_t);
+ break;
+ case ManifestFile::kAddedFilesCountFieldId:
+ PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_files_count,
view_of_column,
+ int32_t);
+ break;
+ case ManifestFile::kExistingFilesCountFieldId:
+ PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].existing_files_count,
+ view_of_column, int32_t);
+ break;
+ case ManifestFile::kDeletedFilesCountFieldId:
+ PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].deleted_files_count,
view_of_column,
+ int32_t);
+ break;
+ case ManifestFile::kAddedRowsCountFieldId:
+ PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_rows_count,
view_of_column,
+ int64_t);
+ break;
+ case ManifestFile::kExistingRowsCountFieldId:
+ PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].existing_rows_count,
view_of_column,
+ int64_t);
+ break;
+ case ManifestFile::kDeletedRowsCountFieldId:
+ PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].deleted_rows_count,
view_of_column,
+ int64_t);
+ break;
+ case ManifestFile::kPartitionSummaryFieldId:
+ ICEBERG_RETURN_UNEXPECTED(
+ ParsePartitionFieldSummaryList(view_of_column, manifest_files));
+ break;
+ case ManifestFile::kKeyMetadataFieldId:
+ PARSE_BINARY_FIELD(manifest_files[row_idx].key_metadata,
view_of_column);
+ break;
+ case ManifestFile::kFirstRowIdFieldId:
+ PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].first_row_id,
view_of_column,
+ int64_t);
+ break;
+ default:
+ return InvalidManifestList("Unsupported field: {} in manifest file.",
field_name);
+ }
+ }
+ return manifest_files;
+}
+
+Status ParsePartitionValues(ArrowArrayView* view_of_partition, int64_t row_idx,
+ std::vector<ManifestEntry>& manifest_entries) {
+ switch (view_of_partition->storage_type) {
+ case ArrowType::NANOARROW_TYPE_BOOL: {
+ auto value = ArrowArrayViewGetUIntUnsafe(view_of_partition, row_idx);
+ manifest_entries[row_idx].data_file->partition.AddValue(
+ Literal::Boolean(value != 0));
+ } break;
+ case ArrowType::NANOARROW_TYPE_INT32: {
+ auto value = ArrowArrayViewGetIntUnsafe(view_of_partition, row_idx);
+
manifest_entries[row_idx].data_file->partition.AddValue(Literal::Int(value));
+ } break;
+ case ArrowType::NANOARROW_TYPE_INT64: {
+ auto value = ArrowArrayViewGetIntUnsafe(view_of_partition, row_idx);
+
manifest_entries[row_idx].data_file->partition.AddValue(Literal::Long(value));
+ } break;
+ case ArrowType::NANOARROW_TYPE_FLOAT: {
+ auto value = ArrowArrayViewGetDoubleUnsafe(view_of_partition, row_idx);
+
manifest_entries[row_idx].data_file->partition.AddValue(Literal::Float(value));
+ } break;
+ case ArrowType::NANOARROW_TYPE_DOUBLE: {
+ auto value = ArrowArrayViewGetDoubleUnsafe(view_of_partition, row_idx);
+
manifest_entries[row_idx].data_file->partition.AddValue(Literal::Double(value));
+ } break;
+ case ArrowType::NANOARROW_TYPE_STRING: {
+ auto value = ArrowArrayViewGetStringUnsafe(view_of_partition, row_idx);
+ manifest_entries[row_idx].data_file->partition.AddValue(
+ Literal::String(std::string(value.data, value.size_bytes)));
+ } break;
+ case ArrowType::NANOARROW_TYPE_BINARY: {
+ auto buffer = ArrowArrayViewGetBytesUnsafe(view_of_partition, row_idx);
+ manifest_entries[row_idx].data_file->partition.AddValue(
+ Literal::Binary(std::vector<uint8_t>(buffer.data.as_char,
+ buffer.data.as_char +
buffer.size_bytes)));
+ } break;
+ default:
+ return InvalidManifest("Unsupported field type: {} in data file
partition.",
+
static_cast<int32_t>(view_of_partition->storage_type));
+ }
+ return {};
+}
+
+Status ParseDataFile(const std::shared_ptr<StructType>& data_file_schema,
+ ArrowArrayView* view_of_column, std::optional<int64_t>&
first_row_id,
+ std::vector<ManifestEntry>& manifest_entries) {
+ if (view_of_column->storage_type != ArrowType::NANOARROW_TYPE_STRUCT) {
+ return InvalidManifest("DataFile field should be a struct.");
+ }
+ if (view_of_column->n_children != data_file_schema->fields().size()) {
+ return InvalidManifest("DataFile schema size:{} not match with ArrayArray
columns:{}",
+ data_file_schema->fields().size(),
view_of_column->n_children);
+ }
+ for (int64_t col_idx = 0; col_idx < view_of_column->n_children; ++col_idx) {
+ ICEBERG_ASSIGN_OR_RAISE(auto field,
data_file_schema->GetFieldByIndex(col_idx));
+ ICEBERG_CHECK(field.has_value(), "Invalid index {} for data file schema",
col_idx);
+ auto field_name = field->get().name();
+ auto field_id = field->get().field_id();
+ auto required = !field->get().optional();
+ auto array_view = view_of_column->children[col_idx];
+ auto manifest_entry_count = array_view->length;
+
+ switch (field_id) {
+ case DataFile::kContentFieldId:
+ PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->content,
array_view,
+ DataFile::Content);
+ break;
+ case DataFile::kFilePathFieldId:
+ PARSE_STRING_FIELD(manifest_entries[row_idx].data_file->file_path,
array_view);
+ break;
+ case DataFile::kFileFormatFieldId:
+ for (int64_t row_idx = 0; row_idx < array_view->length; row_idx++) {
+ if (!ArrowArrayViewIsNull(array_view, row_idx)) {
+ auto value = ArrowArrayViewGetStringUnsafe(array_view, row_idx);
+ std::string_view path_str(value.data, value.size_bytes);
+
ICEBERG_ASSIGN_OR_RAISE(manifest_entries[row_idx].data_file->file_format,
+ FileFormatTypeFromString(path_str));
+ }
+ }
+ break;
+ case DataFile::kPartitionFieldId: {
+ if (array_view->storage_type != ArrowType::NANOARROW_TYPE_STRUCT) {
+ return InvalidManifest("Field:{} should be a struct.", field_name);
+ }
+ if (array_view->n_children > 0) {
+ for (int64_t partition_idx = 0; partition_idx <
array_view->n_children;
+ partition_idx++) {
+ auto view_of_partition = array_view->children[partition_idx];
+ for (int64_t row_idx = 0; row_idx < view_of_partition->length;
row_idx++) {
+ if (ArrowArrayViewIsNull(view_of_partition, row_idx)) {
+ break;
+ }
+ ICEBERG_RETURN_UNEXPECTED(
+ ParsePartitionValues(view_of_partition, row_idx,
manifest_entries));
+ }
+ }
+ }
+ } break;
+ case DataFile::kRecordCountFieldId:
+
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->record_count,
+ array_view, int64_t);
+ break;
+ case DataFile::kFileSizeFieldId:
+
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->file_size_in_bytes,
+ array_view, int64_t);
+ break;
+ case DataFile::kColumnSizesFieldId:
+ // key&value should have the same offset
+ // HACK(xiao.dong) workaround for arrow bug:
+ // ArrowArrayViewListChildOffset can not get the correct offset for map
+
PARSE_INT_LONG_MAP_FIELD(manifest_entries[row_idx].data_file->column_sizes,
+ manifest_entry_count, array_view);
+ break;
+ case DataFile::kValueCountsFieldId:
+
PARSE_INT_LONG_MAP_FIELD(manifest_entries[row_idx].data_file->value_counts,
+ manifest_entry_count, array_view);
+ break;
+ case DataFile::kNullValueCountsFieldId:
+
PARSE_INT_LONG_MAP_FIELD(manifest_entries[row_idx].data_file->null_value_counts,
+ manifest_entry_count, array_view);
+ break;
+ case DataFile::kNanValueCountsFieldId:
+
PARSE_INT_LONG_MAP_FIELD(manifest_entries[row_idx].data_file->nan_value_counts,
+ manifest_entry_count, array_view);
+ break;
+ case DataFile::kLowerBoundsFieldId:
+
PARSE_INT_BINARY_MAP_FIELD(manifest_entries[row_idx].data_file->lower_bounds,
+ manifest_entry_count, array_view);
+ break;
+ case DataFile::kUpperBoundsFieldId:
+
PARSE_INT_BINARY_MAP_FIELD(manifest_entries[row_idx].data_file->upper_bounds,
+ manifest_entry_count, array_view);
+ break;
+ case DataFile::kKeyMetadataFieldId:
+ PARSE_BINARY_FIELD(manifest_entries[row_idx].data_file->key_metadata,
array_view);
+ break;
+ case DataFile::kSplitOffsetsFieldId:
+ PARSE_INTEGER_VECTOR_FIELD(
+ manifest_entries[manifest_idx].data_file->split_offsets,
manifest_entry_count,
+ array_view, int64_t);
+ break;
+ case DataFile::kEqualityIdsFieldId:
+
PARSE_INTEGER_VECTOR_FIELD(manifest_entries[manifest_idx].data_file->equality_ids,
+ manifest_entry_count, array_view, int32_t);
+ break;
+ case DataFile::kSortOrderIdFieldId:
+
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->sort_order_id,
+ array_view, int32_t);
+ break;
+ case DataFile::kFirstRowIdFieldId: {
+
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->first_row_id,
+ array_view, int64_t);
+ if (first_row_id.has_value()) {
+ std::ranges::for_each(manifest_entries,
[&first_row_id](ManifestEntry& entry) {
+ if (entry.status != ManifestStatus::kDeleted &&
+ !entry.data_file->first_row_id.has_value()) {
+ entry.data_file->first_row_id = first_row_id.value();
+ first_row_id = first_row_id.value() +
entry.data_file->record_count;
+ }
+ });
+ } else {
+ // data file's first_row_id is null when the manifest's first_row_id
is null
+ std::ranges::for_each(manifest_entries, [](ManifestEntry& entry) {
+ entry.data_file->first_row_id = std::nullopt;
+ });
+ }
+ break;
+ }
+ case DataFile::kReferencedDataFileFieldId:
+
PARSE_STRING_FIELD(manifest_entries[row_idx].data_file->referenced_data_file,
+ array_view);
+ break;
+ case DataFile::kContentOffsetFieldId:
+
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->content_offset,
+ array_view, int64_t);
+ break;
+ case DataFile::kContentSizeFieldId:
+
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->content_size_in_bytes,
+ array_view, int64_t);
+ break;
+ default:
+ return InvalidManifest("Unsupported field: {} in data file.",
field_name);
+ }
+ }
+ return {};
+}
+
+Result<std::vector<ManifestEntry>> ParseManifestEntry(
+ ArrowSchema* schema, ArrowArray* array_in, const Schema& iceberg_schema,
+ std::optional<int64_t>& first_row_id) {
+ if (schema->n_children != array_in->n_children) {
+ return InvalidManifest("Columns size not match between schema:{} and
array:{}",
+ schema->n_children, array_in->n_children);
+ }
+ if (iceberg_schema.fields().size() != array_in->n_children) {
+ return InvalidManifest("Columns size not match between schema:{} and
array:{}",
+ iceberg_schema.fields().size(),
array_in->n_children);
+ }
+
+ ArrowError error;
+ ArrowArrayView array_view;
+ auto status = ArrowArrayViewInitFromSchema(&array_view, schema, &error);
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(status, error);
+ internal::ArrowArrayViewGuard view_guard(&array_view);
+ status = ArrowArrayViewSetArray(&array_view, array_in, &error);
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(status, error);
+ status = ArrowArrayViewValidate(&array_view,
NANOARROW_VALIDATION_LEVEL_FULL, &error);
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(status, error);
+
+ std::vector<ManifestEntry> manifest_entries;
+ manifest_entries.resize(array_in->length);
+ for (int64_t i = 0; i < array_in->length; i++) {
+ manifest_entries[i].data_file = std::make_shared<DataFile>();
+ }
+
+ for (int64_t idx = 0; idx < array_in->n_children; idx++) {
+ ICEBERG_ASSIGN_OR_RAISE(auto field, iceberg_schema.GetFieldByIndex(idx));
+ ICEBERG_CHECK(field.has_value(), "Invalid index {} for manifest entry
schema", idx);
+ auto field_name = field->get().name();
+ auto field_id = field->get().field_id();
+ auto required = !field->get().optional();
+ auto view_of_column = array_view.children[idx];
+
+ switch (field_id) {
+ case ManifestEntry::kStatusFieldId:
+ PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].status, view_of_column,
+ ManifestStatus);
+ break;
+ case ManifestEntry::kSnapshotIdFieldId:
+ PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].snapshot_id,
view_of_column,
+ int64_t);
+ break;
+ case ManifestEntry::kSequenceNumberFieldId:
+ PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].sequence_number,
view_of_column,
+ int64_t);
+ break;
+ case ManifestEntry::kFileSequenceNumberFieldId:
+ PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].file_sequence_number,
+ view_of_column, int64_t);
+ break;
+ case ManifestEntry::kDataFileFieldId: {
+ auto data_file_schema =
+ internal::checked_pointer_cast<StructType>(field->get().type());
+ ICEBERG_RETURN_UNEXPECTED(ParseDataFile(data_file_schema,
view_of_column,
+ first_row_id,
manifest_entries));
+ break;
+ }
+ default:
+ return InvalidManifest("Unsupported field: {} in manifest entry.",
field_name);
+ }
+ }
+ return manifest_entries;
+}
+
+const std::vector<std::string> kStatsColumns = {"value_counts",
"null_value_counts",
+ "nan_value_counts",
"lower_bounds",
+ "upper_bounds",
"record_count"};
+
+bool RequireStatsProjection(const std::shared_ptr<Expression>& row_filter,
+ const std::vector<std::string>& columns) {
+ if (!row_filter || row_filter->op() == Expression::Operation::kTrue) {
+ return false;
+ }
+ if (columns.empty()) {
+ return false;
+ }
+ const std::unordered_set<std::string_view> selected(columns.cbegin(),
columns.cend());
+ if (selected.contains(ManifestReader::kAllColumns)) {
+ return false;
+ }
+ if (std::ranges::all_of(kStatsColumns, [&selected](const std::string& col) {
+ return selected.contains(col);
+ })) {
+ return false;
+ }
+ return true;
+}
+
+Result<std::shared_ptr<Schema>> ProjectSchema(std::shared_ptr<Schema> schema,
+ const std::vector<std::string>&
columns,
+ bool case_sensitive) {
+ if (!columns.empty()) {
+ return schema->Select(columns, case_sensitive);
+ }
+ return schema;
+}
+
+} // namespace
+
+std::vector<std::string> ManifestReader::WithStatsColumns(
+ const std::vector<std::string>& columns) {
+ if (std::ranges::contains(columns, ManifestReader::kAllColumns)) {
+ return columns;
+ } else {
+ std::vector<std::string> updated_columns{columns};
+ updated_columns.insert(updated_columns.end(), kStatsColumns.begin(),
+ kStatsColumns.end());
+ return updated_columns;
+ }
+}
+
+// ManifestReaderImpl constructor
+ManifestReaderImpl::ManifestReaderImpl(
+ std::string manifest_path, std::optional<int64_t> manifest_length,
+ std::shared_ptr<FileIO> file_io, std::shared_ptr<Schema> schema,
+ std::shared_ptr<PartitionSpec> spec,
+ std::unique_ptr<InheritableMetadata> inheritable_metadata,
+ std::optional<int64_t> first_row_id)
+ : manifest_path_(std::move(manifest_path)),
+ manifest_length_(manifest_length),
+ file_io_(std::move(file_io)),
+ schema_(std::move(schema)),
+ spec_(std::move(spec)),
+ inheritable_metadata_(std::move(inheritable_metadata)),
+ first_row_id_(first_row_id) {}
+
+ManifestReader& ManifestReaderImpl::Select(const std::vector<std::string>&
columns) {
+ columns_ = columns;
+ return *this;
+}
+
+ManifestReader&
ManifestReaderImpl::FilterPartitions(std::shared_ptr<Expression> expr) {
+ part_filter_ = expr ? std::move(expr) : True::Instance();
+ return *this;
+}
+
+ManifestReader& ManifestReaderImpl::FilterPartitions(
+ std::shared_ptr<PartitionSet> partition_set) {
+ partition_set_ = std::move(partition_set);
+ return *this;
+}
+
+ManifestReader& ManifestReaderImpl::FilterRows(std::shared_ptr<Expression>
expr) {
+ row_filter_ = expr ? std::move(expr) : True::Instance();
+ return *this;
+}
+
+ManifestReader& ManifestReaderImpl::CaseSensitive(bool case_sensitive) {
+ case_sensitive_ = case_sensitive;
+ return *this;
+}
+
+bool ManifestReaderImpl::HasPartitionFilter() const {
+ ICEBERG_DCHECK(part_filter_, "Partition filter is not set");
+ return part_filter_->op() != Expression::Operation::kTrue;
+}
+
+bool ManifestReaderImpl::HasRowFilter() const {
+ ICEBERG_DCHECK(row_filter_, "Row filter is not set");
+ return row_filter_->op() != Expression::Operation::kTrue;
+}
+
+Result<Evaluator*> ManifestReaderImpl::GetEvaluator() {
+ if (!evaluator_) {
+ auto projection_evaluator = Projections::Inclusive(*spec_, *schema_,
case_sensitive_);
+ ICEBERG_ASSIGN_OR_RAISE(auto projected,
projection_evaluator->Project(row_filter_));
+ ICEBERG_ASSIGN_OR_RAISE(auto final_part_filter,
+ And::Make(std::move(projected), part_filter_));
+
+ ICEBERG_ASSIGN_OR_RAISE(auto partition_type,
spec_->PartitionType(*schema_));
+ auto partition_schema = partition_type->ToSchema();
+ ICEBERG_ASSIGN_OR_RAISE(
+ evaluator_, Evaluator::Make(*partition_schema,
std::move(final_part_filter),
+ case_sensitive_));
+ }
+ return evaluator_.get();
+}
+
+Result<InclusiveMetricsEvaluator*> ManifestReaderImpl::GetMetricsEvaluator() {
+ if (!metrics_evaluator_) {
+ ICEBERG_ASSIGN_OR_RAISE(
+ metrics_evaluator_,
+ InclusiveMetricsEvaluator::Make(row_filter_, *schema_,
case_sensitive_));
+ }
+ return metrics_evaluator_.get();
+}
+
+bool ManifestReaderImpl::InPartitionSet(const DataFile& file) const {
+ if (!partition_set_) {
+ return true;
+ }
+ return partition_set_->contains(file.partition_spec_id, file.partition);
+}
+
+Status ManifestReaderImpl::OpenReader(std::shared_ptr<Schema> projection) {
+ ICEBERG_PRECHECK(projection != nullptr, "Projection schema cannot be null");
+
+ // Ensure required fields are present in the projected data file schema.
+ std::vector<SchemaField> fields(projection->fields().begin(),
+ projection->fields().end());
+ ICEBERG_ASSIGN_OR_RAISE(auto record_count_field,
+
projection->FindFieldById(DataFile::kRecordCount.field_id()));
+ ICEBERG_ASSIGN_OR_RAISE(auto first_row_id_field,
+
projection->FindFieldById(DataFile::kFirstRowId.field_id()));
+ if (!record_count_field.has_value()) {
+ fields.push_back(DataFile::kRecordCount);
+ }
+ if (!first_row_id_field.has_value()) {
+ fields.push_back(DataFile::kFirstRowId);
+ }
+ /// FIXME: this is not supported yet.
+ // fields.push_back(MetadataColumns::kRowPosition);
+ auto data_file_type = std::make_shared<StructType>(std::move(fields));
+
+ // Wrap the projected data file schema into manifest entry schema
+ file_schema_ =
+
ManifestEntry::TypeFromDataFileType(std::move(data_file_type))->ToSchema();
+
+ ReaderOptions options;
+ options.path = manifest_path_;
+ options.io = file_io_;
+ options.projection = file_schema_;
+ if (manifest_length_.has_value()) {
+ options.length = manifest_length_;
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(file_reader_,
+ ReaderFactoryRegistry::Open(FileFormatType::kAvro,
options));
+
+ return {};
+}
+
+Result<std::vector<ManifestEntry>> ManifestReaderImpl::Entries() {
+ return ReadEntries(/*only_live=*/false);
+}
+
+Result<std::vector<ManifestEntry>> ManifestReaderImpl::LiveEntries() {
+ return ReadEntries(/*only_live=*/true);
+}
+
+Result<std::vector<ManifestEntry>> ManifestReaderImpl::ReadEntries(bool
only_live) {
+ ICEBERG_ASSIGN_OR_RAISE(auto partition_type, spec_->PartitionType(*schema_));
+ auto data_file_schema =
DataFile::Type(std::move(partition_type))->ToSchema();
+
+ std::shared_ptr<Schema> projected_data_file_schema;
+ bool needs_filtering =
+ HasRowFilter() || HasPartitionFilter() || partition_set_ != nullptr;
+ if (needs_filtering) {
+ // ensure stats columns are present for metrics evaluation
+ bool requires_stats_projection = RequireStatsProjection(row_filter_,
columns_);
+ ICEBERG_ASSIGN_OR_RAISE(
+ projected_data_file_schema,
+ ProjectSchema(
+ std::move(data_file_schema),
+ (requires_stats_projection ?
ManifestReader::WithStatsColumns(columns_)
+ : columns_),
+ case_sensitive_));
+ } else {
+ ICEBERG_ASSIGN_OR_RAISE(
+ projected_data_file_schema,
+ ProjectSchema(std::move(data_file_schema), columns_, case_sensitive_));
+ }
+
+ ICEBERG_RETURN_UNEXPECTED(OpenReader(std::move(projected_data_file_schema)));
+ ICEBERG_DCHECK(file_reader_ != nullptr, "File reader should be initialized");
+
+ std::vector<ManifestEntry> manifest_entries;
+ ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, file_reader_->Schema());
+ internal::ArrowSchemaGuard schema_guard(&arrow_schema);
+
+ // Get evaluators if needed
+ Evaluator* evaluator = nullptr;
+ InclusiveMetricsEvaluator* metrics_evaluator = nullptr;
+ if (HasPartitionFilter() || HasRowFilter()) {
+ ICEBERG_ASSIGN_OR_RAISE(evaluator, GetEvaluator());
+ }
+ if (HasRowFilter()) {
+ ICEBERG_ASSIGN_OR_RAISE(metrics_evaluator, GetMetricsEvaluator());
+ }
+
+ while (true) {
+ ICEBERG_ASSIGN_OR_RAISE(auto result, file_reader_->Next());
+ if (!result.has_value()) {
+ break; // EOF
+ }
+
+ internal::ArrowArrayGuard array_guard(&result.value());
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto entries,
+ ParseManifestEntry(&arrow_schema, &result.value(), *file_schema_,
first_row_id_));
+
+ for (auto& entry : entries) {
+ ICEBERG_RETURN_UNEXPECTED(inheritable_metadata_->Apply(entry));
+
+ if (only_live && !entry.IsAlive()) {
+ continue;
+ }
+
+ if (needs_filtering) {
+ ICEBERG_DCHECK(entry.data_file != nullptr, "Data file cannot be null");
+ if (evaluator) {
+ ICEBERG_ASSIGN_OR_RAISE(bool partition_match,
+
evaluator->Evaluate(entry.data_file->partition));
+ if (!partition_match) {
+ continue;
+ }
+ }
+ if (metrics_evaluator) {
+ ICEBERG_ASSIGN_OR_RAISE(bool metrics_match,
+
metrics_evaluator->Evaluate(*entry.data_file));
+ if (!metrics_match) {
+ continue;
+ }
+ }
+ if (!InPartitionSet(*entry.data_file)) {
+ continue;
+ }
+ }
+
+ manifest_entries.push_back(std::move(entry));
+ }
+ }
+
+ return manifest_entries;
+}
+
+Result<std::vector<ManifestFile>> ManifestListReaderImpl::Files() const {
+ std::vector<ManifestFile> manifest_files;
+ ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, reader_->Schema());
+ internal::ArrowSchemaGuard schema_guard(&arrow_schema);
+ while (true) {
+ ICEBERG_ASSIGN_OR_RAISE(auto result, reader_->Next());
+ if (!result.has_value()) {
+ // eof
+ break;
+ }
+ internal::ArrowArrayGuard array_guard(&result.value());
+ ICEBERG_ASSIGN_OR_RAISE(auto parse_result,
+ ParseManifestList(&arrow_schema, &result.value(),
*schema_));
+ manifest_files.insert(manifest_files.end(),
+ std::make_move_iterator(parse_result.begin()),
+ std::make_move_iterator(parse_result.end()));
+ }
+ return manifest_files;
+}
+
+Result<std::unordered_map<std::string, std::string>>
ManifestListReaderImpl::Metadata()
+ const {
+ return reader_->Metadata();
+}
+
+Result<ManifestFileField> ManifestFileFieldFromIndex(int32_t index) {
+ if (index >= 0 && index <
static_cast<int32_t>(ManifestFileField::kNextUnusedId)) {
+ return static_cast<ManifestFileField>(index);
+ }
+ return InvalidArgument("Invalid manifest file field index: {}", index);
+}
+
Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(
const ManifestFile& manifest, std::shared_ptr<FileIO> file_io,
- std::shared_ptr<StructType> partition_type) {
- auto manifest_entry_schema =
- ManifestEntry::TypeFromPartitionType(std::move(partition_type));
- std::shared_ptr<Schema> schema =
- FromStructType(std::move(*manifest_entry_schema), std::nullopt);
-
- ICEBERG_ASSIGN_OR_RAISE(auto reader,
- ReaderFactoryRegistry::Open(FileFormatType::kAvro,
- {.path =
manifest.manifest_path,
- .length =
manifest.manifest_length,
- .io =
std::move(file_io),
- .projection = schema}));
+ std::shared_ptr<Schema> schema, std::shared_ptr<PartitionSpec> spec) {
+ if (file_io == nullptr || schema == nullptr || spec == nullptr) {
+ return InvalidArgument(
+ "FileIO, Schema, and PartitionSpec cannot be null to create
ManifestReader");
+ }
+
// Create inheritable metadata for this manifest
ICEBERG_ASSIGN_OR_RAISE(auto inheritable_metadata,
InheritableMetadataFactory::FromManifest(manifest));
-
- return std::make_unique<ManifestReaderImpl>(std::move(reader),
std::move(schema),
- std::move(inheritable_metadata),
- manifest.first_row_id);
+ return std::make_unique<ManifestReaderImpl>(
+ manifest.manifest_path, manifest.manifest_length, std::move(file_io),
+ std::move(schema), std::move(spec), std::move(inheritable_metadata),
+ manifest.first_row_id);
}
Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
- std::shared_ptr<StructType> partition_type) {
- auto manifest_entry_schema =
- ManifestEntry::TypeFromPartitionType(std::move(partition_type));
- auto fields_span = manifest_entry_schema->fields();
- std::vector<SchemaField> fields(fields_span.begin(), fields_span.end());
- auto schema = std::make_shared<Schema>(fields);
- ICEBERG_ASSIGN_OR_RAISE(
- auto reader, ReaderFactoryRegistry::Open(FileFormatType::kAvro,
- {.path =
std::string(manifest_location),
- .io = std::move(file_io),
- .projection = schema}));
+ std::shared_ptr<Schema> schema, std::shared_ptr<PartitionSpec> spec) {
+ if (file_io == nullptr || schema == nullptr || spec == nullptr) {
+ return InvalidArgument(
+ "FileIO, Schema, and PartitionSpec cannot be null to create
ManifestReader");
+ }
+
+ // No metadata to inherit in this case.
ICEBERG_ASSIGN_OR_RAISE(auto inheritable_metadata,
InheritableMetadataFactory::Empty());
- return std::make_unique<ManifestReaderImpl>(std::move(reader),
std::move(schema),
- std::move(inheritable_metadata),
- std::nullopt);
+ return std::make_unique<ManifestReaderImpl>(
+ std::string(manifest_location), std::nullopt, std::move(file_io),
std::move(schema),
+ std::move(spec), std::move(inheritable_metadata), std::nullopt);
}
Result<std::unique_ptr<ManifestListReader>> ManifestListReader::Make(
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io) {
- std::shared_ptr<Schema> schema = ManifestFile::Type();
+ std::shared_ptr<Schema> schema = ManifestFile::Type()->ToSchema();
ICEBERG_ASSIGN_OR_RAISE(
auto reader,
ReaderFactoryRegistry::Open(FileFormatType::kAvro,
diff --git a/src/iceberg/manifest/manifest_reader.h
b/src/iceberg/manifest/manifest_reader.h
index e7d6044d..a35c1fb9 100644
--- a/src/iceberg/manifest/manifest_reader.h
+++ b/src/iceberg/manifest/manifest_reader.h
@@ -23,11 +23,12 @@
/// Data reader interface for manifest files.
#include <memory>
+#include <string>
+#include <unordered_map>
#include <vector>
#include "iceberg/iceberg_export.h"
#include "iceberg/result.h"
-#include "iceberg/type.h"
#include "iceberg/type_fwd.h"
namespace iceberg {
@@ -35,31 +36,67 @@ namespace iceberg {
/// \brief Read manifest entries from a manifest file.
class ICEBERG_EXPORT ManifestReader {
public:
+ /// \brief Special value to select all columns from manifest files.
+ static constexpr std::string_view kAllColumns = "*";
+
virtual ~ManifestReader() = default;
/// \brief Read all manifest entries in the manifest file.
- virtual Result<std::vector<ManifestEntry>> Entries() const = 0;
+ ///
+ /// TODO(gangwu): provide a lazy-evaluated iterator interface for better
performance.
+ virtual Result<std::vector<ManifestEntry>> Entries() = 0;
- /// \brief Get the metadata of the manifest file.
- virtual Result<std::unordered_map<std::string, std::string>> Metadata()
const = 0;
+ /// \brief Read only live (non-deleted) manifest entries.
+ virtual Result<std::vector<ManifestEntry>> LiveEntries() = 0;
+
+ /// \brief Select specific columns of data file to read from the manifest
entries.
+ ///
+ /// \note Column names should match the names in `DataFile` schema.
Unmatched names
+ /// will be ignored.
+ virtual ManifestReader& Select(const std::vector<std::string>& columns) = 0;
+
+ /// \brief Filter manifest entries by partition filter.
+ ///
+ /// \note Unlike the Java implementation, this method does not combine new
expressions
+ /// with existing ones. Each call replaces the previous partition filter.
+ virtual ManifestReader& FilterPartitions(std::shared_ptr<Expression> expr) =
0;
+
+ /// \brief Filter manifest entries to a specific set of partitions.
+ virtual ManifestReader& FilterPartitions(
+ std::shared_ptr<class PartitionSet> partition_set) = 0;
+
+ /// \brief Filter manifest entries by row-level filter.
+ ///
+ /// \note Unlike the Java implementation, this method does not combine new
expressions
+ /// with existing ones. Each call replaces the previous row filter.
+ virtual ManifestReader& FilterRows(std::shared_ptr<Expression> expr) = 0;
+
+ /// \brief Set case sensitivity for column name matching.
+ virtual ManifestReader& CaseSensitive(bool case_sensitive) = 0;
/// \brief Creates a reader for a manifest file.
/// \param manifest A ManifestFile object containing metadata about the
manifest.
/// \param file_io File IO implementation to use.
- /// \param partition_type Schema for the partition.
+ /// \param schema Schema used to bind the partition type.
+ /// \param spec Partition spec used for this manifest file.
/// \return A Result containing the reader or an error.
static Result<std::unique_ptr<ManifestReader>> Make(
const ManifestFile& manifest, std::shared_ptr<FileIO> file_io,
- std::shared_ptr<StructType> partition_type);
+ std::shared_ptr<Schema> schema, std::shared_ptr<PartitionSpec> spec);
/// \brief Creates a reader for a manifest file.
/// \param manifest_location Path to the manifest file.
/// \param file_io File IO implementation to use.
- /// \param partition_type Schema for the partition.
+ /// \param schema Schema used to bind the partition type.
+ /// \param spec Partition spec used for this manifest file.
/// \return A Result containing the reader or an error.
static Result<std::unique_ptr<ManifestReader>> Make(
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
- std::shared_ptr<StructType> partition_type);
+ std::shared_ptr<Schema> schema, std::shared_ptr<PartitionSpec> spec);
+
+ /// \brief Add stats columns to the column list if needed.
+ static std::vector<std::string> WithStatsColumns(
+ const std::vector<std::string>& columns);
};
/// \brief Read manifest files from a manifest list file.
diff --git a/src/iceberg/manifest/manifest_reader_internal.cc
b/src/iceberg/manifest/manifest_reader_internal.cc
deleted file mode 100644
index f7e8d428..00000000
--- a/src/iceberg/manifest/manifest_reader_internal.cc
+++ /dev/null
@@ -1,606 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "manifest_reader_internal.h"
-
-#include <nanoarrow/nanoarrow.h>
-
-#include "iceberg/arrow/nanoarrow_status_internal.h"
-#include "iceberg/arrow_c_data_guard_internal.h"
-#include "iceberg/file_format.h"
-#include "iceberg/manifest/manifest_entry.h"
-#include "iceberg/manifest/manifest_list.h"
-#include "iceberg/schema.h"
-#include "iceberg/type.h"
-#include "iceberg/util/checked_cast.h"
-#include "iceberg/util/macros.h"
-
-namespace iceberg {
-
-#define PARSE_PRIMITIVE_FIELD(item, array_view, type)
\
- for (int64_t row_idx = 0; row_idx < array_view->length; row_idx++) {
\
- if (!ArrowArrayViewIsNull(array_view, row_idx)) {
\
- auto value = ArrowArrayViewGetIntUnsafe(array_view, row_idx);
\
- item = static_cast<type>(value);
\
- } else if (required) {
\
- return InvalidManifestList("Field {} is required but null at row {}",
field_name, \
- row_idx);
\
- }
\
- }
-
-#define PARSE_STRING_FIELD(item, array_view)
\
- for (int64_t row_idx = 0; row_idx < array_view->length; row_idx++) {
\
- if (!ArrowArrayViewIsNull(array_view, row_idx)) {
\
- auto value = ArrowArrayViewGetStringUnsafe(array_view, row_idx);
\
- item = std::string(value.data, value.size_bytes);
\
- } else if (required) {
\
- return InvalidManifestList("Field {} is required but null at row {}",
field_name, \
- row_idx);
\
- }
\
- }
-
-#define PARSE_BINARY_FIELD(item, array_view)
\
- for (int64_t row_idx = 0; row_idx < array_view->length; row_idx++) {
\
- if (!ArrowArrayViewIsNull(view_of_column, row_idx)) {
\
- item = ArrowArrayViewGetInt8Vector(array_view, row_idx);
\
- } else if (required) {
\
- return InvalidManifestList("Field {} is required but null at row {}",
field_name, \
- row_idx);
\
- }
\
- }
-
-#define PARSE_INTEGER_VECTOR_FIELD(item, count, array_view, type)
\
- for (int64_t manifest_idx = 0; manifest_idx < count; manifest_idx++) {
\
- auto offset = ArrowArrayViewListChildOffset(array_view, manifest_idx);
\
- auto next_offset = ArrowArrayViewListChildOffset(array_view, manifest_idx
+ 1); \
- for (int64_t offset_idx = offset; offset_idx < next_offset; offset_idx++)
{ \
- item.emplace_back(static_cast<type>(
\
- ArrowArrayViewGetIntUnsafe(array_view->children[0], offset_idx)));
\
- }
\
- }
-
-#define PARSE_MAP_FIELD(item, count, array_view, key_type, value_type,
assignment) \
- do {
\
- if (array_view->storage_type != ArrowType::NANOARROW_TYPE_MAP) {
\
- return InvalidManifest("Field:{} should be a map.", field_name);
\
- }
\
- auto view_of_map = array_view->children[0];
\
- ASSERT_VIEW_TYPE_AND_CHILDREN(view_of_map,
ArrowType::NANOARROW_TYPE_STRUCT, 2); \
- auto view_of_map_key = view_of_map->children[0];
\
- ASSERT_VIEW_TYPE(view_of_map_key, key_type);
\
- auto view_of_map_value = view_of_map->children[1];
\
- ASSERT_VIEW_TYPE(view_of_map_value, value_type);
\
- for (int64_t row_idx = 0; row_idx < count; row_idx++) {
\
- auto offset = array_view->buffer_views[1].data.as_int32[row_idx];
\
- auto next_offset = array_view->buffer_views[1].data.as_int32[row_idx +
1]; \
- for (int32_t offset_idx = offset; offset_idx < next_offset;
offset_idx++) { \
- auto key = ArrowArrayViewGetIntUnsafe(view_of_map_key, offset_idx);
\
- item[key] = assignment;
\
- }
\
- }
\
- } while (0)
-
-#define PARSE_INT_LONG_MAP_FIELD(item, count, array_view) \
- PARSE_MAP_FIELD(item, count, array_view, ArrowType::NANOARROW_TYPE_INT32, \
- ArrowType::NANOARROW_TYPE_INT64, \
- ArrowArrayViewGetIntUnsafe(view_of_map_value, offset_idx));
-
-#define PARSE_INT_BINARY_MAP_FIELD(item, count, array_view) \
- PARSE_MAP_FIELD(item, count, array_view, ArrowType::NANOARROW_TYPE_INT32, \
- ArrowType::NANOARROW_TYPE_BINARY, \
- ArrowArrayViewGetInt8Vector(view_of_map_value, offset_idx));
-
-#define ASSERT_VIEW_TYPE(view, type)
\
- if (view->storage_type != type) {
\
- return InvalidManifest("Sub Field:{} should be a {}.", field_name, #type);
\
- }
-
-#define ASSERT_VIEW_TYPE_AND_CHILDREN(view, type, n_child)
\
- if (view->storage_type != type) {
\
- return InvalidManifest("Sub Field:{} should be a {}.", field_name, #type);
\
- }
\
- if (view->n_children != n_child) {
\
- return InvalidManifest("Sub Field for:{} should have key&value:{}
columns.", \
- field_name, n_child);
\
- }
-
-std::vector<uint8_t> ArrowArrayViewGetInt8Vector(const ArrowArrayView* view,
- int32_t offset_idx) {
- auto buffer = ArrowArrayViewGetBytesUnsafe(view, offset_idx);
- return {buffer.data.as_char, buffer.data.as_char + buffer.size_bytes};
-}
-
-Status ParsePartitionFieldSummaryList(ArrowArrayView* view_of_column,
- std::vector<ManifestFile>&
manifest_files) {
- auto manifest_count = view_of_column->length;
- // view_of_column is list<struct<PartitionFieldSummary>>
- if (view_of_column->storage_type != ArrowType::NANOARROW_TYPE_LIST) {
- return InvalidManifestList("partitions field should be a list.");
- }
- auto view_of_list_iterm = view_of_column->children[0];
- // view_of_list_iterm is struct<PartitionFieldSummary>
- if (view_of_list_iterm->storage_type != ArrowType::NANOARROW_TYPE_STRUCT) {
- return InvalidManifestList("partitions list field should be a list.");
- }
- if (view_of_list_iterm->n_children != 4) {
- return InvalidManifestList("PartitionFieldSummary should have 4 fields.");
- }
- if (view_of_list_iterm->children[0]->storage_type !=
ArrowType::NANOARROW_TYPE_BOOL) {
- return InvalidManifestList("contains_null should have be bool type
column.");
- }
- auto contains_null = view_of_list_iterm->children[0];
- if (view_of_list_iterm->children[1]->storage_type !=
ArrowType::NANOARROW_TYPE_BOOL) {
- return InvalidManifestList("contains_nan should have be bool type
column.");
- }
- auto contains_nan = view_of_list_iterm->children[1];
- if (view_of_list_iterm->children[2]->storage_type !=
ArrowType::NANOARROW_TYPE_BINARY) {
- return InvalidManifestList("lower_bound should have be binary type
column.");
- }
- auto lower_bound_list = view_of_list_iterm->children[2];
- if (view_of_list_iterm->children[3]->storage_type !=
ArrowType::NANOARROW_TYPE_BINARY) {
- return InvalidManifestList("upper_bound should have be binary type
column.");
- }
- auto upper_bound_list = view_of_list_iterm->children[3];
- for (int64_t manifest_idx = 0; manifest_idx < manifest_count;
manifest_idx++) {
- auto offset = ArrowArrayViewListChildOffset(view_of_column, manifest_idx);
- auto next_offset = ArrowArrayViewListChildOffset(view_of_column,
manifest_idx + 1);
- // partitions from offset to next_offset belongs to manifest_idx
- auto& manifest_file = manifest_files[manifest_idx];
- for (int64_t partition_idx = offset; partition_idx < next_offset;
partition_idx++) {
- PartitionFieldSummary partition_field_summary;
- if (!ArrowArrayViewIsNull(contains_null, partition_idx)) {
- partition_field_summary.contains_null =
- ArrowArrayViewGetIntUnsafe(contains_null, partition_idx);
- } else {
- return InvalidManifestList("contains_null is null at row {}",
partition_idx);
- }
- if (!ArrowArrayViewIsNull(contains_nan, partition_idx)) {
- partition_field_summary.contains_nan =
- ArrowArrayViewGetIntUnsafe(contains_nan, partition_idx);
- }
- if (!ArrowArrayViewIsNull(lower_bound_list, partition_idx)) {
- partition_field_summary.lower_bound =
- ArrowArrayViewGetInt8Vector(lower_bound_list, partition_idx);
- }
- if (!ArrowArrayViewIsNull(upper_bound_list, partition_idx)) {
- partition_field_summary.upper_bound =
- ArrowArrayViewGetInt8Vector(upper_bound_list, partition_idx);
- }
-
- manifest_file.partitions.emplace_back(partition_field_summary);
- }
- }
- return {};
-}
-
-Result<std::vector<ManifestFile>> ParseManifestList(ArrowSchema* schema,
- ArrowArray* array_in,
- const Schema&
iceberg_schema) {
- if (schema->n_children != array_in->n_children) {
- return InvalidManifestList("Columns size not match between schema:{} and
array:{}",
- schema->n_children, array_in->n_children);
- }
- if (iceberg_schema.fields().size() != array_in->n_children) {
- return InvalidManifestList("Columns size not match between schema:{} and
array:{}",
- iceberg_schema.fields().size(),
array_in->n_children);
- }
-
- ArrowError error;
- ArrowArrayView array_view;
- auto status = ArrowArrayViewInitFromSchema(&array_view, schema, &error);
- ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(status, error);
- internal::ArrowArrayViewGuard view_guard(&array_view);
- status = ArrowArrayViewSetArray(&array_view, array_in, &error);
- ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(status, error);
- status = ArrowArrayViewValidate(&array_view,
NANOARROW_VALIDATION_LEVEL_FULL, &error);
- ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(status, error);
-
- std::vector<ManifestFile> manifest_files;
- manifest_files.resize(array_in->length);
-
- for (int64_t idx = 0; idx < array_in->n_children; idx++) {
- const auto& field = iceberg_schema.GetFieldByIndex(idx);
- if (!field.has_value()) {
- return InvalidSchema("Field index {} is not found in schema", idx);
- }
- auto field_name = field.value()->get().name();
- bool required = !field.value()->get().optional();
- auto view_of_column = array_view.children[idx];
- ICEBERG_ASSIGN_OR_RAISE(auto manifest_file_field,
ManifestFileFieldFromIndex(idx));
- switch (manifest_file_field) {
- case ManifestFileField::kManifestPath:
- PARSE_STRING_FIELD(manifest_files[row_idx].manifest_path,
view_of_column);
- break;
- case ManifestFileField::kManifestLength:
- PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].manifest_length,
view_of_column,
- int64_t);
- break;
- case ManifestFileField::kPartitionSpecId:
- PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].partition_spec_id,
view_of_column,
- int32_t);
- break;
- case ManifestFileField::kContent:
- PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].content, view_of_column,
- ManifestContent);
- break;
- case ManifestFileField::kSequenceNumber:
- PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].sequence_number,
view_of_column,
- int64_t);
- break;
- case ManifestFileField::kMinSequenceNumber:
- PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].min_sequence_number,
view_of_column,
- int64_t);
- break;
- case ManifestFileField::kAddedSnapshotId:
- PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_snapshot_id,
view_of_column,
- int64_t);
- break;
- case ManifestFileField::kAddedFilesCount:
- PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_files_count,
view_of_column,
- int32_t);
- break;
- case ManifestFileField::kExistingFilesCount:
- PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].existing_files_count,
- view_of_column, int32_t);
- break;
- case ManifestFileField::kDeletedFilesCount:
- PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].deleted_files_count,
view_of_column,
- int32_t);
- break;
- case ManifestFileField::kAddedRowsCount:
- PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_rows_count,
view_of_column,
- int64_t);
- break;
- case ManifestFileField::kExistingRowsCount:
- PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].existing_rows_count,
view_of_column,
- int64_t);
- break;
- case ManifestFileField::kDeletedRowsCount:
- PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].deleted_rows_count,
view_of_column,
- int64_t);
- break;
- case ManifestFileField::kPartitionFieldSummary:
- ICEBERG_RETURN_UNEXPECTED(
- ParsePartitionFieldSummaryList(view_of_column, manifest_files));
- break;
- case ManifestFileField::kKeyMetadata:
- PARSE_BINARY_FIELD(manifest_files[row_idx].key_metadata,
view_of_column);
- break;
- case ManifestFileField::kFirstRowId:
- PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].first_row_id,
view_of_column,
- int64_t);
- break;
- default:
- return InvalidManifestList("Unsupported field: {} in manifest file.",
field_name);
- }
- }
- return manifest_files;
-}
-
-Status ParseLiteral(ArrowArrayView* view_of_partition, int64_t row_idx,
- std::vector<ManifestEntry>& manifest_entries) {
- if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_BOOL) {
- auto value = ArrowArrayViewGetUIntUnsafe(view_of_partition, row_idx);
-
manifest_entries[row_idx].data_file->partition.AddValue(Literal::Boolean(value
!= 0));
- } else if (view_of_partition->storage_type ==
ArrowType::NANOARROW_TYPE_INT32) {
- auto value = ArrowArrayViewGetIntUnsafe(view_of_partition, row_idx);
-
manifest_entries[row_idx].data_file->partition.AddValue(Literal::Int(value));
- } else if (view_of_partition->storage_type ==
ArrowType::NANOARROW_TYPE_INT64) {
- auto value = ArrowArrayViewGetIntUnsafe(view_of_partition, row_idx);
-
manifest_entries[row_idx].data_file->partition.AddValue(Literal::Long(value));
- } else if (view_of_partition->storage_type ==
ArrowType::NANOARROW_TYPE_FLOAT) {
- auto value = ArrowArrayViewGetDoubleUnsafe(view_of_partition, row_idx);
-
manifest_entries[row_idx].data_file->partition.AddValue(Literal::Float(value));
- } else if (view_of_partition->storage_type ==
ArrowType::NANOARROW_TYPE_DOUBLE) {
- auto value = ArrowArrayViewGetDoubleUnsafe(view_of_partition, row_idx);
-
manifest_entries[row_idx].data_file->partition.AddValue(Literal::Double(value));
- } else if (view_of_partition->storage_type ==
ArrowType::NANOARROW_TYPE_STRING) {
- auto value = ArrowArrayViewGetStringUnsafe(view_of_partition, row_idx);
- manifest_entries[row_idx].data_file->partition.AddValue(
- Literal::String(std::string(value.data, value.size_bytes)));
- } else if (view_of_partition->storage_type ==
ArrowType::NANOARROW_TYPE_BINARY) {
- auto buffer = ArrowArrayViewGetBytesUnsafe(view_of_partition, row_idx);
- manifest_entries[row_idx].data_file->partition.AddValue(
- Literal::Binary(std::vector<uint8_t>(buffer.data.as_char,
- buffer.data.as_char +
buffer.size_bytes)));
- } else {
- return InvalidManifest("Unsupported field type: {} in data file
partition.",
-
static_cast<int32_t>(view_of_partition->storage_type));
- }
- return {};
-}
-
-Status ParseDataFile(const std::shared_ptr<StructType>& data_file_schema,
- ArrowArrayView* view_of_column, std::optional<int64_t>&
first_row_id,
- std::vector<ManifestEntry>& manifest_entries) {
- if (view_of_column->storage_type != ArrowType::NANOARROW_TYPE_STRUCT) {
- return InvalidManifest("DataFile field should be a struct.");
- }
- if (view_of_column->n_children != data_file_schema->fields().size()) {
- return InvalidManifest("DataFile schema size:{} not match with ArrayArray
columns:{}",
- data_file_schema->fields().size(),
view_of_column->n_children);
- }
- for (int64_t col_idx = 0; col_idx < view_of_column->n_children; ++col_idx) {
- auto field_name =
data_file_schema->GetFieldByIndex(col_idx).value()->get().name();
- auto required =
!data_file_schema->GetFieldByIndex(col_idx).value()->get().optional();
- auto view_of_file_field = view_of_column->children[col_idx];
- auto manifest_entry_count = view_of_file_field->length;
-
- switch (col_idx) {
- case 0:
- PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->content,
- view_of_file_field, DataFile::Content);
- break;
- case 1:
- PARSE_STRING_FIELD(manifest_entries[row_idx].data_file->file_path,
- view_of_file_field);
- break;
- case 2:
- for (int64_t row_idx = 0; row_idx < view_of_file_field->length;
row_idx++) {
- if (!ArrowArrayViewIsNull(view_of_file_field, row_idx)) {
- auto value = ArrowArrayViewGetStringUnsafe(view_of_file_field,
row_idx);
- std::string_view path_str(value.data, value.size_bytes);
-
ICEBERG_ASSIGN_OR_RAISE(manifest_entries[row_idx].data_file->file_format,
- FileFormatTypeFromString(path_str));
- }
- }
- break;
- case 3: {
- if (view_of_file_field->storage_type !=
ArrowType::NANOARROW_TYPE_STRUCT) {
- return InvalidManifest("Field:{} should be a struct.", field_name);
- }
- if (view_of_file_field->n_children > 0) {
- for (int64_t partition_idx = 0; partition_idx <
view_of_file_field->n_children;
- partition_idx++) {
- auto view_of_partition =
view_of_file_field->children[partition_idx];
- for (int64_t row_idx = 0; row_idx < view_of_partition->length;
row_idx++) {
- if (ArrowArrayViewIsNull(view_of_partition, row_idx)) {
- break;
- }
- ICEBERG_RETURN_UNEXPECTED(
- ParseLiteral(view_of_partition, row_idx, manifest_entries));
- }
- }
- }
- } break;
- case 4:
-
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->record_count,
- view_of_file_field, int64_t);
- break;
- case 5:
-
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->file_size_in_bytes,
- view_of_file_field, int64_t);
- break;
- case 6:
- // key&value should have the same offset
- // HACK(xiao.dong) workaround for arrow bug:
- // ArrowArrayViewListChildOffset can not get the correct offset for map
-
PARSE_INT_LONG_MAP_FIELD(manifest_entries[row_idx].data_file->column_sizes,
- manifest_entry_count, view_of_file_field);
- break;
- case 7:
-
PARSE_INT_LONG_MAP_FIELD(manifest_entries[row_idx].data_file->value_counts,
- manifest_entry_count, view_of_file_field);
- break;
- case 8:
-
PARSE_INT_LONG_MAP_FIELD(manifest_entries[row_idx].data_file->null_value_counts,
- manifest_entry_count, view_of_file_field);
- break;
- case 9:
-
PARSE_INT_LONG_MAP_FIELD(manifest_entries[row_idx].data_file->nan_value_counts,
- manifest_entry_count, view_of_file_field);
- break;
- case 10:
-
PARSE_INT_BINARY_MAP_FIELD(manifest_entries[row_idx].data_file->lower_bounds,
- manifest_entry_count, view_of_file_field);
- break;
- case 11:
-
PARSE_INT_BINARY_MAP_FIELD(manifest_entries[row_idx].data_file->upper_bounds,
- manifest_entry_count, view_of_file_field);
- break;
- case 12:
- PARSE_BINARY_FIELD(manifest_entries[row_idx].data_file->key_metadata,
- view_of_file_field);
- break;
- case 13:
- PARSE_INTEGER_VECTOR_FIELD(
- manifest_entries[manifest_idx].data_file->split_offsets,
manifest_entry_count,
- view_of_file_field, int64_t);
- break;
- case 14:
-
PARSE_INTEGER_VECTOR_FIELD(manifest_entries[manifest_idx].data_file->equality_ids,
- manifest_entry_count, view_of_file_field,
int32_t);
- break;
- case 15:
-
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->sort_order_id,
- view_of_file_field, int32_t);
- break;
- case 16: {
-
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->first_row_id,
- view_of_file_field, int64_t);
- if (first_row_id.has_value()) {
- std::ranges::for_each(manifest_entries,
[&first_row_id](ManifestEntry& entry) {
- if (entry.status != ManifestStatus::kDeleted &&
- !entry.data_file->first_row_id.has_value()) {
- entry.data_file->first_row_id = first_row_id.value();
- first_row_id = first_row_id.value() +
entry.data_file->record_count;
- }
- });
- } else {
- // data file's first_row_id is null when the manifest's first_row_id
is null
- std::ranges::for_each(manifest_entries, [](ManifestEntry& entry) {
- entry.data_file->first_row_id = std::nullopt;
- });
- }
- break;
- }
- case 17:
-
PARSE_STRING_FIELD(manifest_entries[row_idx].data_file->referenced_data_file,
- view_of_file_field);
- break;
- case 18:
-
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->content_offset,
- view_of_file_field, int64_t);
- break;
- case 19:
-
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->content_size_in_bytes,
- view_of_file_field, int64_t);
- break;
- default:
- return InvalidManifest("Unsupported field: {} in data file.",
field_name);
- }
- }
- return {};
-}
-
-Result<std::vector<ManifestEntry>> ParseManifestEntry(
- ArrowSchema* schema, ArrowArray* array_in, const Schema& iceberg_schema,
- std::optional<int64_t>& first_row_id) {
- if (schema->n_children != array_in->n_children) {
- return InvalidManifest("Columns size not match between schema:{} and
array:{}",
- schema->n_children, array_in->n_children);
- }
- if (iceberg_schema.fields().size() != array_in->n_children) {
- return InvalidManifest("Columns size not match between schema:{} and
array:{}",
- iceberg_schema.fields().size(),
array_in->n_children);
- }
-
- ArrowError error;
- ArrowArrayView array_view;
- auto status = ArrowArrayViewInitFromSchema(&array_view, schema, &error);
- ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(status, error);
- internal::ArrowArrayViewGuard view_guard(&array_view);
- status = ArrowArrayViewSetArray(&array_view, array_in, &error);
- ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(status, error);
- status = ArrowArrayViewValidate(&array_view,
NANOARROW_VALIDATION_LEVEL_FULL, &error);
- ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(status, error);
-
- std::vector<ManifestEntry> manifest_entries;
- manifest_entries.resize(array_in->length);
- for (int64_t i = 0; i < array_in->length; i++) {
- manifest_entries[i].data_file = std::make_shared<DataFile>();
- }
-
- for (int64_t idx = 0; idx < array_in->n_children; idx++) {
- const auto& field = iceberg_schema.GetFieldByIndex(idx);
- if (!field.has_value()) {
- return InvalidManifest("Field not found in schema: {}", idx);
- }
- auto field_name = field.value()->get().name();
- bool required = !field.value()->get().optional();
- auto view_of_column = array_view.children[idx];
-
- switch (idx) {
- case 0:
- PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].status, view_of_column,
- ManifestStatus);
- break;
- case 1:
- PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].snapshot_id,
view_of_column,
- int64_t);
- break;
- case 2:
- PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].sequence_number,
view_of_column,
- int64_t);
- break;
- case 3:
- PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].file_sequence_number,
- view_of_column, int64_t);
- break;
- case 4: {
- auto data_file_schema =
-
internal::checked_pointer_cast<StructType>(field.value()->get().type());
- ICEBERG_RETURN_UNEXPECTED(ParseDataFile(data_file_schema,
view_of_column,
- first_row_id,
manifest_entries));
- break;
- }
- default:
- return InvalidManifest("Unsupported field: {} in manifest entry.",
field_name);
- }
- }
- return manifest_entries;
-}
-
-Result<std::vector<ManifestEntry>> ManifestReaderImpl::Entries() const {
- std::vector<ManifestEntry> manifest_entries;
- ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, reader_->Schema());
- internal::ArrowSchemaGuard schema_guard(&arrow_schema);
- while (true) {
- ICEBERG_ASSIGN_OR_RAISE(auto result, reader_->Next());
- if (result.has_value()) {
- internal::ArrowArrayGuard array_guard(&result.value());
- ICEBERG_ASSIGN_OR_RAISE(
- auto parse_result,
- ParseManifestEntry(&arrow_schema, &result.value(), *schema_,
first_row_id_));
- manifest_entries.insert(manifest_entries.end(),
- std::make_move_iterator(parse_result.begin()),
- std::make_move_iterator(parse_result.end()));
- } else {
- // eof
- break;
- }
- }
-
- // Apply inheritance to all entries
- for (auto& entry : manifest_entries) {
- ICEBERG_RETURN_UNEXPECTED(inheritable_metadata_->Apply(entry));
- }
-
- return manifest_entries;
-}
-
-Result<std::unordered_map<std::string, std::string>>
ManifestReaderImpl::Metadata()
- const {
- return reader_->Metadata();
-}
-
-Result<std::vector<ManifestFile>> ManifestListReaderImpl::Files() const {
- std::vector<ManifestFile> manifest_files;
- ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, reader_->Schema());
- internal::ArrowSchemaGuard schema_guard(&arrow_schema);
- while (true) {
- ICEBERG_ASSIGN_OR_RAISE(auto result, reader_->Next());
- if (result.has_value()) {
- internal::ArrowArrayGuard array_guard(&result.value());
- ICEBERG_ASSIGN_OR_RAISE(
- auto parse_result, ParseManifestList(&arrow_schema, &result.value(),
*schema_));
- manifest_files.insert(manifest_files.end(),
- std::make_move_iterator(parse_result.begin()),
- std::make_move_iterator(parse_result.end()));
- } else {
- // eof
- break;
- }
- }
- return manifest_files;
-}
-
-Result<std::unordered_map<std::string, std::string>>
ManifestListReaderImpl::Metadata()
- const {
- return reader_->Metadata();
-}
-
-Result<ManifestFileField> ManifestFileFieldFromIndex(int32_t index) {
- if (index >= 0 && index <
static_cast<int32_t>(ManifestFileField::kNextUnusedId)) {
- return static_cast<ManifestFileField>(index);
- }
- return InvalidArgument("Invalid manifest file field index: {}", index);
-}
-
-} // namespace iceberg
diff --git a/src/iceberg/manifest/manifest_reader_internal.h
b/src/iceberg/manifest/manifest_reader_internal.h
index 513f4050..ba804ffb 100644
--- a/src/iceberg/manifest/manifest_reader_internal.h
+++ b/src/iceberg/manifest/manifest_reader_internal.h
@@ -22,33 +22,101 @@
/// \file iceberg/manifest/manifest_reader_internal.h
/// Reader implementation for manifest list files and manifest files.
+#include <memory>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include "iceberg/expression/evaluator.h"
+#include "iceberg/expression/expression.h"
+#include "iceberg/expression/inclusive_metrics_evaluator.h"
#include "iceberg/file_reader.h"
#include "iceberg/inheritable_metadata.h"
#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/util/partition_value_util.h"
namespace iceberg {
/// \brief Read manifest entries from a manifest file.
+///
+/// This implementation supports lazy reader creation and filtering based on
+/// partition expressions, row expressions, and partition sets. Following the
+/// Java implementation pattern.
class ManifestReaderImpl : public ManifestReader {
public:
- explicit ManifestReaderImpl(std::unique_ptr<Reader> reader,
- std::shared_ptr<Schema> schema,
- std::unique_ptr<InheritableMetadata>
inheritable_metadata,
- std::optional<int64_t> first_row_id)
- : schema_(std::move(schema)),
- reader_(std::move(reader)),
- inheritable_metadata_(std::move(inheritable_metadata)),
- first_row_id_(first_row_id) {}
+ /// \brief Construct a ManifestReaderImpl for lazy initialization.
+ ///
+ /// \param manifest_path Path to the manifest file.
+ /// \param manifest_length Length of the manifest file (optional).
+ /// \param file_io File IO implementation.
+ /// \param schema Table schema.
+ /// \param spec Partition spec.
+ /// \param inheritable_metadata Metadata inherited from manifest.
+ /// \param first_row_id First row ID for V3 manifests.
+ /// \note ManifestReader::Make() functions should guarantee non-null
parameters.
+ ManifestReaderImpl(std::string manifest_path, std::optional<int64_t>
manifest_length,
+ std::shared_ptr<FileIO> file_io, std::shared_ptr<Schema>
schema,
+ std::shared_ptr<PartitionSpec> spec,
+ std::unique_ptr<InheritableMetadata> inheritable_metadata,
+ std::optional<int64_t> first_row_id);
- Result<std::vector<ManifestEntry>> Entries() const override;
+ Result<std::vector<ManifestEntry>> Entries() override;
- Result<std::unordered_map<std::string, std::string>> Metadata() const
override;
+ Result<std::vector<ManifestEntry>> LiveEntries() override;
+
+ ManifestReader& Select(const std::vector<std::string>& columns) override;
+
+ ManifestReader& FilterPartitions(std::shared_ptr<Expression> expr) override;
+
+ ManifestReader& FilterPartitions(std::shared_ptr<PartitionSet>
partition_set) override;
+
+ ManifestReader& FilterRows(std::shared_ptr<Expression> expr) override;
+
+ ManifestReader& CaseSensitive(bool case_sensitive) override;
private:
- std::shared_ptr<Schema> schema_;
- std::unique_ptr<Reader> reader_;
- std::unique_ptr<InheritableMetadata> inheritable_metadata_;
- mutable std::optional<int64_t> first_row_id_;
+ /// \brief Read entries with optional live-only filtering.
+ Result<std::vector<ManifestEntry>> ReadEntries(bool only_live);
+
+ /// \brief Lazily open the underlying Avro reader with appropriate schema
projection.
+ Status OpenReader(std::shared_ptr<Schema> projection);
+
+ /// \brief Check if there's a non-trivial partition filter.
+ bool HasPartitionFilter() const;
+
+ /// \brief Check if there's a non-trivial row filter.
+ bool HasRowFilter() const;
+
+ /// \brief Get or create the partition evaluator.
+ Result<Evaluator*> GetEvaluator();
+
+ /// \brief Get or create the metrics evaluator.
+ Result<InclusiveMetricsEvaluator*> GetMetricsEvaluator();
+
+ /// \brief Check if a partition is in the partition set.
+ bool InPartitionSet(const DataFile& file) const;
+
+ // Fields set at construction
+ const std::string manifest_path_;
+ const std::optional<int64_t> manifest_length_;
+ const std::shared_ptr<FileIO> file_io_;
+ const std::shared_ptr<Schema> schema_;
+ const std::shared_ptr<PartitionSpec> spec_;
+ const std::unique_ptr<InheritableMetadata> inheritable_metadata_;
+ std::optional<int64_t> first_row_id_;
+
+ // Configuration fields
+ std::vector<std::string> columns_;
+ std::shared_ptr<Expression> part_filter_{True::Instance()};
+ std::shared_ptr<Expression> row_filter_{True::Instance()};
+ std::shared_ptr<PartitionSet> partition_set_;
+ bool case_sensitive_{true};
+
+ // Lazy fields
+ std::unique_ptr<Reader> file_reader_;
+ std::shared_ptr<Schema> file_schema_;
+ std::unique_ptr<Evaluator> evaluator_;
+ std::unique_ptr<InclusiveMetricsEvaluator> metrics_evaluator_;
};
/// \brief Read manifest files from a manifest list file.
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index 1596a2ea..7c1011fc 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -64,7 +64,6 @@ iceberg_sources = files(
'manifest/manifest_entry.cc',
'manifest/manifest_list.cc',
'manifest/manifest_reader.cc',
- 'manifest/manifest_reader_internal.cc',
'manifest/manifest_writer.cc',
'manifest/v1_metadata.cc',
'manifest/v2_metadata.cc',
diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc
index 1c4a73ff..6c182845 100644
--- a/src/iceberg/table_scan.cc
+++ b/src/iceberg/table_scan.cc
@@ -271,15 +271,13 @@ Result<std::vector<std::shared_ptr<FileScanTask>>>
DataTableScan::PlanFiles() co
std::vector<std::shared_ptr<FileScanTask>> tasks;
ICEBERG_ASSIGN_OR_RAISE(auto partition_spec,
context_.table_metadata->PartitionSpec());
- // Get the table schema and partition type
+ // Get the table schema
ICEBERG_ASSIGN_OR_RAISE(auto current_schema,
context_.table_metadata->Schema());
- ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<StructType> partition_type,
- partition_spec->PartitionType(*current_schema));
for (const auto& manifest_file : manifest_files) {
ICEBERG_ASSIGN_OR_RAISE(
auto manifest_reader,
- ManifestReader::Make(manifest_file, file_io_, partition_type));
+ ManifestReader::Make(manifest_file, file_io_, current_schema,
partition_spec));
ICEBERG_ASSIGN_OR_RAISE(auto manifests, manifest_reader->Entries());
// TODO(gty404): filter manifests using partition spec and filter
expression
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index 0d36f64c..7e71310d 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -122,6 +122,8 @@ if(ICEBERG_BUILD_BUNDLE)
avro_schema_test.cc
avro_stream_test.cc
manifest_list_versions_test.cc
+ manifest_reader_stats_test.cc
+ manifest_reader_test.cc
manifest_writer_versions_test.cc)
add_iceberg_test(arrow_test
diff --git a/src/iceberg/test/manifest_reader_stats_test.cc
b/src/iceberg/test/manifest_reader_stats_test.cc
new file mode 100644
index 00000000..327da225
--- /dev/null
+++ b/src/iceberg/test/manifest_reader_stats_test.cc
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <chrono>
+#include <format>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/avro/avro_register.h"
+#include "iceberg/expression/expressions.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/schema.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/transform.h"
+#include "iceberg/type.h"
+
+namespace iceberg {
+
+class TestManifestReaderStats : public testing::TestWithParam<int> {
+ protected:
+ void SetUp() override {
+ avro::RegisterAll();
+ file_io_ = arrow::MakeMockFileIO();
+
+ schema_ = std::make_shared<Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(/*field_id=*/3, "id", int32()),
+ SchemaField::MakeRequired(/*field_id=*/4, "data", string())});
+
+ ICEBERG_UNWRAP_OR_FAIL(
+ spec_,
+ PartitionSpec::Make(
+ /*spec_id=*/0, {PartitionField(/*source_id=*/4, /*field_id=*/1000,
+ "data_bucket",
Transform::Bucket(16))}));
+ }
+
+ std::string MakeManifestPath() {
+ return std::format("manifest-{}.avro",
+
std::chrono::system_clock::now().time_since_epoch().count());
+ }
+
+ inline static const std::map<int32_t, int64_t> kValueCounts = {{3, 3L}};
+ inline static const std::map<int32_t, int64_t> kNullValueCounts = {{3, 0L}};
+ inline static const std::map<int32_t, int64_t> kNanValueCounts = {{3, 1L}};
+ inline static const std::map<int32_t, std::vector<uint8_t>> kLowerBounds = {
+ {3, Literal::Int(2).Serialize().value()}};
+ inline static const std::map<int32_t, std::vector<uint8_t>> kUpperBounds = {
+ {3, Literal::Int(4).Serialize().value()}};
+
+ std::unique_ptr<DataFile> MakeDataFileWithStats() {
+ return std::make_unique<DataFile>(DataFile{
+ .file_path = "/path/to/data-with-stats.parquet",
+ .file_format = FileFormatType::kParquet,
+ .partition = PartitionValues({Literal::Int(0)}),
+ .record_count = 3,
+ .file_size_in_bytes = 10,
+ .value_counts = kValueCounts,
+ .null_value_counts = kNullValueCounts,
+ .nan_value_counts = kNanValueCounts,
+ .lower_bounds = kLowerBounds,
+ .upper_bounds = kUpperBounds,
+ .sort_order_id = 0,
+ });
+ }
+
+ ManifestFile WriteManifest(int format_version, std::unique_ptr<DataFile>
data_file) {
+ const std::string manifest_path = MakeManifestPath();
+
+ Result<std::unique_ptr<ManifestWriter>> writer_result =
+ NotSupported("Format version: {}", format_version);
+
+ switch (format_version) {
+ case 1:
+ writer_result = ManifestWriter::MakeV1Writer(/*snapshot_id=*/1000L,
manifest_path,
+ file_io_, spec_, schema_);
+ break;
+ case 2:
+ writer_result =
+ ManifestWriter::MakeV2Writer(/*snapshot_id=*/1000L, manifest_path,
file_io_,
+ spec_, schema_,
ManifestContent::kData);
+ break;
+ case 3:
+ writer_result = ManifestWriter::MakeV3Writer(
+ /*snapshot_id=*/1000L, /*sequence_number=*/0L, manifest_path,
file_io_, spec_,
+ schema_, ManifestContent::kData);
+ break;
+ }
+
+ EXPECT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ EXPECT_THAT(writer->WriteAddedEntry(std::move(data_file)), IsOk());
+ EXPECT_THAT(writer->Close(), IsOk());
+
+ auto manifest_result = writer->ToManifestFile();
+ EXPECT_THAT(manifest_result, IsOk());
+ return std::move(manifest_result.value());
+ }
+
+ void AssertFullStats(const DataFile& file) {
+ EXPECT_EQ(file.record_count, 3);
+ EXPECT_EQ(file.value_counts, kValueCounts);
+ EXPECT_EQ(file.null_value_counts, kNullValueCounts);
+ EXPECT_EQ(file.nan_value_counts, kNanValueCounts);
+ EXPECT_EQ(file.lower_bounds, kLowerBounds);
+ EXPECT_EQ(file.upper_bounds, kUpperBounds);
+ }
+
+ void AssertStatsDropped(const DataFile& file) {
+ EXPECT_EQ(file.record_count, 3); // record count is not dropped
+ EXPECT_TRUE(file.value_counts.empty());
+ EXPECT_TRUE(file.null_value_counts.empty());
+ EXPECT_TRUE(file.nan_value_counts.empty());
+ EXPECT_TRUE(file.lower_bounds.empty());
+ EXPECT_TRUE(file.upper_bounds.empty());
+ }
+
+ std::shared_ptr<FileIO> file_io_;
+ std::shared_ptr<Schema> schema_;
+ std::shared_ptr<PartitionSpec> spec_;
+};
+
+TEST_P(TestManifestReaderStats, TestReadIncludesFullStats) {
+ int version = GetParam();
+ auto manifest = WriteManifest(version, MakeDataFileWithStats());
+
+ auto reader_result = ManifestReader::Make(manifest, file_io_, schema_,
spec_);
+ ASSERT_THAT(reader_result, IsOk());
+ auto reader = std::move(reader_result.value());
+
+ auto entries_result = reader->Entries();
+ ASSERT_THAT(entries_result, IsOk());
+ auto entries = entries_result.value();
+ ASSERT_EQ(entries.size(), 1);
+ AssertFullStats(*entries[0].data_file);
+}
+
+TEST_P(TestManifestReaderStats, TestReadEntriesWithFilterIncludesFullStats) {
+ int version = GetParam();
+ auto manifest = WriteManifest(version, MakeDataFileWithStats());
+
+ auto reader_result = ManifestReader::Make(manifest, file_io_, schema_,
spec_);
+ ASSERT_THAT(reader_result, IsOk());
+ auto reader = std::move(reader_result.value());
+
+ reader->FilterRows(Expressions::Equal("id", Literal::Int(3)));
+
+ auto entries_result = reader->Entries();
+ ASSERT_THAT(entries_result, IsOk());
+ auto entries = entries_result.value();
+ ASSERT_EQ(entries.size(), 1);
+ AssertFullStats(*entries[0].data_file);
+}
+
+TEST_P(TestManifestReaderStats,
TestReadEntriesWithFilterAndSelectIncludesFullStats) {
+ int version = GetParam();
+ auto manifest = WriteManifest(version, MakeDataFileWithStats());
+
+ auto reader_result = ManifestReader::Make(manifest, file_io_, schema_,
spec_);
+ ASSERT_THAT(reader_result, IsOk());
+ auto reader = std::move(reader_result.value());
+
+ reader->Select({"file_path"});
+ reader->FilterRows(Expressions::Equal("id", Literal::Int(3)));
+
+ auto entries_result = reader->Entries();
+ ASSERT_THAT(entries_result, IsOk());
+ auto entries = entries_result.value();
+ ASSERT_EQ(entries.size(), 1);
+ AssertFullStats(*entries[0].data_file);
+}
+
+TEST_P(TestManifestReaderStats, TestReadEntriesWithSelectNotProjectStats) {
+ int version = GetParam();
+ auto manifest = WriteManifest(version, MakeDataFileWithStats());
+
+ auto reader_result = ManifestReader::Make(manifest, file_io_, schema_,
spec_);
+ ASSERT_THAT(reader_result, IsOk());
+ auto reader = std::move(reader_result.value());
+
+ reader->Select({"file_path"});
+
+ auto entries_result = reader->Entries();
+ ASSERT_THAT(entries_result, IsOk());
+ auto entries = entries_result.value();
+ ASSERT_EQ(entries.size(), 1);
+ AssertStatsDropped(*entries[0].data_file);
+}
+
+TEST_P(TestManifestReaderStats,
TestReadEntriesWithSelectCertainStatNotProjectStats) {
+ int version = GetParam();
+ auto manifest = WriteManifest(version, MakeDataFileWithStats());
+
+ auto reader_result = ManifestReader::Make(manifest, file_io_, schema_,
spec_);
+ ASSERT_THAT(reader_result, IsOk());
+ auto reader = std::move(reader_result.value());
+
+ reader->Select({"file_path", "value_counts"});
+
+ auto entries_result = reader->Entries();
+ ASSERT_THAT(entries_result, IsOk());
+ auto entries = entries_result.value();
+ ASSERT_EQ(entries.size(), 1);
+
+ const auto& file = *entries[0].data_file;
+ EXPECT_FALSE(file.value_counts.empty());
+ EXPECT_TRUE(file.null_value_counts.empty());
+ EXPECT_TRUE(file.nan_value_counts.empty());
+ EXPECT_TRUE(file.lower_bounds.empty());
+ EXPECT_TRUE(file.upper_bounds.empty());
+}
+
+INSTANTIATE_TEST_SUITE_P(ManifestReaderStatsVersions, TestManifestReaderStats,
+ testing::Values(1, 2, 3));
+
+} // namespace iceberg
diff --git a/src/iceberg/test/manifest_reader_test.cc
b/src/iceberg/test/manifest_reader_test.cc
new file mode 100644
index 00000000..ab4798db
--- /dev/null
+++ b/src/iceberg/test/manifest_reader_test.cc
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest/manifest_reader.h"
+
+#include <chrono>
+#include <format>
+#include <memory>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/avro/avro_register.h"
+#include "iceberg/expression/expressions.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/schema.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/transform.h"
+#include "iceberg/type.h"
+
+namespace iceberg {
+
+class TestManifestReader : public testing::TestWithParam<int> {
+ protected:
+ void SetUp() override {
+ avro::RegisterAll();
+
+ file_io_ = arrow::MakeMockFileIO();
+
+ schema_ = std::make_shared<Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(/*field_id=*/3, "id", int32()),
+ SchemaField::MakeRequired(/*field_id=*/4, "data", string())});
+
+ ICEBERG_UNWRAP_OR_FAIL(
+ spec_,
+ PartitionSpec::Make(
+ /*spec_id=*/0, {PartitionField(/*source_id=*/4, /*field_id=*/1000,
+ "data_bucket",
Transform::Bucket(16))}));
+ }
+
+ std::string MakeManifestPath() {
+ return std::format("manifest-{}.avro",
+
std::chrono::system_clock::now().time_since_epoch().count());
+ }
+
+ std::unique_ptr<DataFile> MakeDataFile(const std::string& path,
+ const PartitionValues& partition,
+ int64_t record_count = 1) {
+ return std::make_unique<DataFile>(DataFile{
+ .file_path = path,
+ .file_format = FileFormatType::kParquet,
+ .partition = partition,
+ .record_count = record_count,
+ .file_size_in_bytes = 10,
+ .sort_order_id = 0,
+ });
+ }
+
+ ManifestFile WriteManifest(int format_version, std::optional<int64_t>
snapshot_id,
+ const std::vector<ManifestEntry>& entries) {
+ const std::string manifest_path = MakeManifestPath();
+
+ Result<std::unique_ptr<ManifestWriter>> writer_result =
+ NotSupported("Format version: {}", format_version);
+
+ switch (format_version) {
+ case 1:
+ writer_result = ManifestWriter::MakeV1Writer(snapshot_id,
manifest_path, file_io_,
+ spec_, schema_);
+ break;
+ case 2:
+ writer_result = ManifestWriter::MakeV2Writer(
+ snapshot_id, manifest_path, file_io_, spec_, schema_,
ManifestContent::kData);
+ break;
+ case 3:
+ writer_result = ManifestWriter::MakeV3Writer(snapshot_id,
/*first_row_id=*/0L,
+ manifest_path, file_io_,
spec_,
+ schema_,
ManifestContent::kData);
+ break;
+ }
+
+ EXPECT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ for (const auto& entry : entries) {
+ EXPECT_THAT(writer->WriteEntry(entry), IsOk());
+ }
+
+ EXPECT_THAT(writer->Close(), IsOk());
+ auto manifest_result = writer->ToManifestFile();
+ EXPECT_THAT(manifest_result, IsOk());
+ return std::move(manifest_result.value());
+ }
+
+ ManifestEntry MakeEntry(ManifestStatus status, int64_t snapshot_id,
+ std::unique_ptr<DataFile> file) {
+ return ManifestEntry{
+ .status = status,
+ .snapshot_id = snapshot_id,
+ .sequence_number =
+ (status == ManifestStatus::kAdded) ? std::nullopt :
std::make_optional(0),
+ .file_sequence_number = std::nullopt,
+ .data_file = std::move(file),
+ };
+ }
+
+ std::unique_ptr<DataFile> MakeDeleteFile(
+ const std::string& path, const PartitionValues& partition,
+ DataFile::Content content,
+ std::optional<std::string> referenced_file = std::nullopt,
+ std::optional<int64_t> content_offset = std::nullopt,
+ std::optional<int64_t> content_size = std::nullopt) {
+ return std::make_unique<DataFile>(DataFile{
+ .content = content,
+ .file_path = path,
+ .file_format = FileFormatType::kParquet,
+ .partition = partition,
+ .record_count = 1,
+ .file_size_in_bytes = 10,
+ .equality_ids = (content == DataFile::Content::kEqualityDeletes)
+ ? std::vector<int>{3}
+ : std::vector<int>{},
+ .referenced_data_file = referenced_file,
+ .content_offset = content_offset,
+ .content_size_in_bytes = content_size,
+ });
+ }
+
+ ManifestFile WriteDeleteManifest(int format_version, int64_t snapshot_id,
+ std::vector<ManifestEntry> entries) {
+ const std::string manifest_path = MakeManifestPath();
+
+ Result<std::unique_ptr<ManifestWriter>> writer_result =
+ NotSupported("Format version: {}", format_version);
+
+ if (format_version == 2) {
+ writer_result =
+ ManifestWriter::MakeV2Writer(snapshot_id, manifest_path, file_io_,
spec_,
+ schema_, ManifestContent::kDeletes);
+ } else if (format_version == 3) {
+ writer_result = ManifestWriter::MakeV3Writer(
+ snapshot_id, /*first_row_id=*/std::nullopt, manifest_path, file_io_,
spec_,
+ schema_, ManifestContent::kDeletes);
+ }
+
+ EXPECT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ for (const auto& entry : entries) {
+ EXPECT_THAT(writer->WriteEntry(entry), IsOk());
+ }
+
+ EXPECT_THAT(writer->Close(), IsOk());
+ auto manifest_result = writer->ToManifestFile();
+ EXPECT_THAT(manifest_result, IsOk());
+ return std::move(manifest_result.value());
+ }
+
+ std::shared_ptr<FileIO> file_io_;
+ std::shared_ptr<Schema> schema_;
+ std::shared_ptr<PartitionSpec> spec_;
+};
+
+TEST_P(TestManifestReader, TestManifestReaderWithEmptyInheritableMetadata) {
+ int version = GetParam();
+ auto file_a =
+ MakeDataFile("/path/to/data-a.parquet",
PartitionValues({Literal::Int(0)}));
+
+ auto entry =
+ MakeEntry(ManifestStatus::kExisting, /*snapshot_id=*/1000L,
std::move(file_a));
+ entry.sequence_number = 0;
+ entry.file_sequence_number = 0;
+
+ std::vector<ManifestEntry> entries;
+ entries.push_back(std::move(entry));
+ auto manifest = WriteManifest(version, /*snapshot_id=*/1000L,
std::move(entries));
+
+ auto reader_result = ManifestReader::Make(manifest, file_io_, schema_,
spec_);
+ ASSERT_THAT(reader_result, IsOk());
+ auto reader = std::move(reader_result.value());
+
+ auto entries_result = reader->Entries();
+ ASSERT_THAT(entries_result, IsOk());
+ auto read_entries = entries_result.value();
+
+ ASSERT_EQ(read_entries.size(), 1);
+ const auto& read_entry = read_entries[0];
+ EXPECT_EQ(read_entry.status, ManifestStatus::kExisting);
+ EXPECT_EQ(read_entry.data_file->file_path, "/path/to/data-a.parquet");
+ EXPECT_EQ(read_entry.snapshot_id, 1000L);
+}
+
+TEST_P(TestManifestReader, TestReaderWithFilterWithoutSelect) {
+ int version = GetParam();
+ auto file_a =
+ MakeDataFile("/path/to/data-a.parquet",
PartitionValues({Literal::Int(0)}));
+ auto file_b =
+ MakeDataFile("/path/to/data-b.parquet",
PartitionValues({Literal::Int(1)}));
+ auto file_c =
+ MakeDataFile("/path/to/data-c.parquet",
PartitionValues({Literal::Int(2)}));
+
+ std::vector<ManifestEntry> entries;
+ entries.push_back(MakeEntry(ManifestStatus::kAdded, 1000L,
std::move(file_a)));
+ entries.push_back(MakeEntry(ManifestStatus::kAdded, 1000L,
std::move(file_b)));
+ entries.push_back(MakeEntry(ManifestStatus::kAdded, 1000L,
std::move(file_c)));
+
+ auto manifest = WriteManifest(version, 1000L, std::move(entries));
+
+ auto reader_result = ManifestReader::Make(manifest, file_io_, schema_,
spec_);
+ ASSERT_THAT(reader_result, IsOk());
+ auto reader = std::move(reader_result.value());
+
+ reader->FilterRows(Expressions::Equal("id", Literal::Int(0)));
+
+ auto result_entries = reader->Entries();
+ ASSERT_THAT(result_entries, IsOk());
+ auto read_entries = result_entries.value();
+
+ // note that all files are returned because the reader returns data files
that may
+ // match, and the partition is bucketing by data, which doesn't help filter
files
+ ASSERT_EQ(read_entries.size(), 3);
+ EXPECT_EQ(read_entries[0].data_file->file_path, "/path/to/data-a.parquet");
+ EXPECT_EQ(read_entries[1].data_file->file_path, "/path/to/data-b.parquet");
+ EXPECT_EQ(read_entries[2].data_file->file_path, "/path/to/data-c.parquet");
+}
+
+TEST_P(TestManifestReader, TestManifestReaderWithPartitionMetadata) {
+ int version = GetParam();
+ auto file_a =
+ MakeDataFile("/path/to/data-a.parquet",
PartitionValues({Literal::Int(0)}));
+ auto entry =
+ MakeEntry(ManifestStatus::kExisting, /*snapshot_id=*/123L,
std::move(file_a));
+ entry.sequence_number = 0;
+ entry.file_sequence_number = 0;
+
+ std::vector<ManifestEntry> entries;
+ entries.push_back(std::move(entry));
+ auto manifest = WriteManifest(version, /*snapshot_id=*/1000L,
std::move(entries));
+
+ auto reader_result = ManifestReader::Make(manifest, file_io_, schema_,
spec_);
+ ASSERT_THAT(reader_result, IsOk());
+ auto reader = std::move(reader_result.value());
+
+ auto entries_result = reader->Entries();
+ ASSERT_THAT(entries_result, IsOk());
+ auto read_entries = entries_result.value();
+
+ ASSERT_EQ(read_entries.size(), 1);
+ const auto& read_entry = read_entries[0];
+ EXPECT_EQ(read_entry.snapshot_id, 123L);
+
+ ASSERT_EQ(read_entry.data_file->partition.num_fields(), 1);
+ EXPECT_EQ(read_entry.data_file->partition.values()[0], Literal::Int(0));
+}
+
+TEST_P(TestManifestReader, TestDeleteFilesWithReferences) {
+ int version = GetParam();
+ if (version < 2) {
+ GTEST_SKIP() << "Delete files only supported in V2+";
+ }
+
+ auto delete_file_1 = MakeDeleteFile(
+ "/path/to/data-a-deletes.parquet", PartitionValues({Literal::Int(0)}),
+ DataFile::Content::kPositionDeletes, "/path/to/data-a.parquet");
+ auto delete_file_2 = MakeDeleteFile(
+ "/path/to/data-b-deletes.parquet", PartitionValues({Literal::Int(1)}),
+ DataFile::Content::kPositionDeletes, "/path/to/data-b.parquet");
+
+ std::vector<ManifestEntry> entries;
+ entries.push_back(
+ MakeEntry(ManifestStatus::kAdded, /*snapshot_id=*/1000L,
std::move(delete_file_1)));
+ entries.push_back(
+ MakeEntry(ManifestStatus::kAdded, /*snapshot_id=*/1000L,
std::move(delete_file_2)));
+
+ auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L,
std::move(entries));
+
+ auto reader_result = ManifestReader::Make(manifest, file_io_, schema_,
spec_);
+ ASSERT_THAT(reader_result, IsOk());
+ auto reader = std::move(reader_result.value());
+
+ auto entries_result = reader->Entries();
+ ASSERT_THAT(entries_result, IsOk());
+ auto read_entries = entries_result.value();
+ ASSERT_EQ(read_entries.size(), 2);
+
+ for (const auto& entry : read_entries) {
+ if (entry.data_file->file_path == "/path/to/data-a-deletes.parquet") {
+ EXPECT_EQ(entry.data_file->referenced_data_file,
"/path/to/data-a.parquet");
+ } else {
+ EXPECT_EQ(entry.data_file->referenced_data_file,
"/path/to/data-b.parquet");
+ }
+ }
+}
+
+TEST_P(TestManifestReader, TestDVs) {
+ int version = GetParam();
+ if (version < 3) {
+ GTEST_SKIP() << "DVs only supported in V3+";
+ }
+
+ auto dv1 =
+ MakeDeleteFile("/path/to/data-a-deletes.puffin",
PartitionValues({Literal::Int(0)}),
+ DataFile::Content::kPositionDeletes,
"/path/to/data-a.parquet",
+ /*content_offset=*/4L, /*content_size_in_bytes=*/6L);
+ auto dv2 =
+ MakeDeleteFile("/path/to/data-b-deletes.puffin",
PartitionValues({Literal::Int(1)}),
+ DataFile::Content::kPositionDeletes,
"/path/to/data-b.parquet",
+ /*content_offset=*/4L, /*content_size_in_bytes=*/6L);
+
+ std::vector<ManifestEntry> entries;
+ entries.push_back(
+ MakeEntry(ManifestStatus::kAdded, /*snapshot_id=*/1000L,
std::move(dv1)));
+ entries.push_back(
+ MakeEntry(ManifestStatus::kAdded, /*snapshot_id=*/1000L,
std::move(dv2)));
+ auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L,
std::move(entries));
+
+ auto reader_result = ManifestReader::Make(manifest, file_io_, schema_,
spec_);
+ ASSERT_THAT(reader_result, IsOk());
+ auto reader = std::move(reader_result.value());
+
+ auto entries_result = reader->Entries();
+ ASSERT_THAT(entries_result, IsOk());
+ auto read_entries = entries_result.value();
+ ASSERT_EQ(read_entries.size(), 2);
+
+ for (const auto& entry : read_entries) {
+ if (entry.data_file->file_path == "/path/to/data-a-deletes.puffin") {
+ EXPECT_EQ(entry.data_file->referenced_data_file,
"/path/to/data-a.parquet");
+ EXPECT_EQ(entry.data_file->content_offset, 4L);
+ EXPECT_EQ(entry.data_file->content_size_in_bytes, 6L);
+ } else {
+ EXPECT_EQ(entry.data_file->referenced_data_file,
"/path/to/data-b.parquet");
+ EXPECT_EQ(entry.data_file->content_offset, 4L);
+ EXPECT_EQ(entry.data_file->content_size_in_bytes, 6L);
+ }
+ }
+}
+
+TEST_P(TestManifestReader, TestInvalidUsage) {
+ int version = GetParam();
+ auto file_a =
+ MakeDataFile("/path/to/data-a.parquet",
PartitionValues({Literal::Int(0)}));
+ auto entry =
+ MakeEntry(ManifestStatus::kExisting, /*snapshot_id=*/1000L,
std::move(file_a));
+ entry.sequence_number = 0;
+ entry.file_sequence_number = 0;
+
+ auto manifest =
+ WriteManifest(version, /*snapshot_id=*/std::nullopt, {std::move(entry)});
+
+ auto reader_result = ManifestReader::Make(manifest, file_io_, schema_,
spec_);
+ EXPECT_THAT(reader_result, IsError(ErrorKind::kInvalidManifest));
+ EXPECT_THAT(reader_result, HasErrorMessage("has no snapshot ID"));
+}
+
+INSTANTIATE_TEST_SUITE_P(ManifestReaderVersions, TestManifestReader,
+ testing::Values(1, 2, 3));
+
+} // namespace iceberg
diff --git a/src/iceberg/test/manifest_writer_versions_test.cc
b/src/iceberg/test/manifest_writer_versions_test.cc
index 36f9445f..e3229fc7 100644
--- a/src/iceberg/test/manifest_writer_versions_test.cc
+++ b/src/iceberg/test/manifest_writer_versions_test.cc
@@ -241,10 +241,7 @@ class ManifestWriterVersionsTest : public ::testing::Test {
}
std::vector<ManifestEntry> ReadManifest(const ManifestFile& manifest_file) {
- auto partition_type_result = spec_->PartitionType(*schema_);
- EXPECT_THAT(partition_type_result, IsOk());
- std::shared_ptr<StructType> partition_type =
std::move(partition_type_result.value());
- auto reader_result = ManifestReader::Make(manifest_file, file_io_,
partition_type);
+ auto reader_result = ManifestReader::Make(manifest_file, file_io_,
schema_, spec_);
EXPECT_THAT(reader_result, IsOk());
auto reader = std::move(reader_result.value());