This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new f43d24b2 feat: enhance ManifestReader with projection and filtering 
support (#431)
f43d24b2 is described below

commit f43d24b2bbcc981ed074a9689da5b6571fa131e2
Author: Gang Wu <[email protected]>
AuthorDate: Thu Dec 25 09:40:48 2025 +0800

    feat: enhance ManifestReader with projection and filtering support (#431)
---
 src/iceberg/CMakeLists.txt                        |   1 -
 src/iceberg/manifest/manifest_entry.h             | 154 ++--
 src/iceberg/manifest/manifest_list.cc             |  14 +-
 src/iceberg/manifest/manifest_list.h              |  85 ++-
 src/iceberg/manifest/manifest_reader.cc           | 876 +++++++++++++++++++++-
 src/iceberg/manifest/manifest_reader.h            |  53 +-
 src/iceberg/manifest/manifest_reader_internal.cc  | 606 ---------------
 src/iceberg/manifest/manifest_reader_internal.h   |  96 ++-
 src/iceberg/meson.build                           |   1 -
 src/iceberg/table_scan.cc                         |   6 +-
 src/iceberg/test/CMakeLists.txt                   |   2 +
 src/iceberg/test/manifest_reader_stats_test.cc    | 239 ++++++
 src/iceberg/test/manifest_reader_test.cc          | 381 ++++++++++
 src/iceberg/test/manifest_writer_versions_test.cc |   5 +-
 14 files changed, 1755 insertions(+), 764 deletions(-)

diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index dcc9372f..e48afbfd 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -42,7 +42,6 @@ set(ICEBERG_SOURCES
     manifest/manifest_entry.cc
     manifest/manifest_list.cc
     manifest/manifest_reader.cc
-    manifest/manifest_reader_internal.cc
     manifest/manifest_writer.cc
     manifest/v1_metadata.cc
     manifest/v2_metadata.cc
diff --git a/src/iceberg/manifest/manifest_entry.h 
b/src/iceberg/manifest/manifest_entry.h
index 8e3183a4..f82b7a2b 100644
--- a/src/iceberg/manifest/manifest_entry.h
+++ b/src/iceberg/manifest/manifest_entry.h
@@ -178,94 +178,114 @@ struct ICEBERG_EXPORT DataFile {
   /// present
   std::optional<int64_t> content_size_in_bytes;
 
+  static constexpr int32_t kContentFieldId = 134;
   inline static const SchemaField kContent = SchemaField::MakeOptional(
-      134, "content", iceberg::int32(),
+      kContentFieldId, "content", int32(),
       "Contents of the file: 0=data, 1=position deletes, 2=equality deletes");
+
+  static constexpr int32_t kFilePathFieldId = 100;
   inline static const SchemaField kFilePath = SchemaField::MakeRequired(
-      100, "file_path", iceberg::string(), "Location URI with FS scheme");
-  inline static const SchemaField kFileFormat = SchemaField::MakeRequired(
-      101, "file_format", iceberg::string(), "File format name: avro, orc, or 
parquet");
-  inline static const int32_t kPartitionFieldId = 102;
+      kFilePathFieldId, "file_path", string(), "Location URI with FS scheme");
+
+  static constexpr int32_t kFileFormatFieldId = 101;
+  inline static const SchemaField kFileFormat =
+      SchemaField::MakeRequired(kFileFormatFieldId, "file_format", string(),
+                                "File format name: avro, orc, or parquet");
+
+  static constexpr int32_t kPartitionFieldId = 102;
   inline static const std::string kPartitionField = "partition";
   inline static const std::string kPartitionDoc =
       "Partition data tuple, schema based on the partition spec";
+
+  static constexpr int32_t kRecordCountFieldId = 103;
   inline static const SchemaField kRecordCount = SchemaField::MakeRequired(
-      103, "record_count", iceberg::int64(), "Number of records in the file");
+      kRecordCountFieldId, "record_count", int64(), "Number of records in the 
file");
+
+  static constexpr int32_t kFileSizeFieldId = 104;
   inline static const SchemaField kFileSize = SchemaField::MakeRequired(
-      104, "file_size_in_bytes", iceberg::int64(), "Total file size in bytes");
+      kFileSizeFieldId, "file_size_in_bytes", int64(), "Total file size in 
bytes");
+
+  static constexpr int32_t kColumnSizesFieldId = 108;
   inline static const SchemaField kColumnSizes = SchemaField::MakeOptional(
-      108, "column_sizes",
-      std::make_shared<MapType>(
-          SchemaField::MakeRequired(117, std::string(MapType::kKeyName),
-                                    iceberg::int32()),
-          SchemaField::MakeRequired(118, std::string(MapType::kValueName),
-                                    iceberg::int64())),
+      kColumnSizesFieldId, "column_sizes",
+      map(SchemaField::MakeRequired(117, std::string(MapType::kKeyName), 
int32()),
+          SchemaField::MakeRequired(118, std::string(MapType::kValueName), 
int64())),
       "Map of column id to total size on disk");
+
+  static constexpr int32_t kValueCountsFieldId = 109;
   inline static const SchemaField kValueCounts = SchemaField::MakeOptional(
-      109, "value_counts",
-      std::make_shared<MapType>(
-          SchemaField::MakeRequired(119, std::string(MapType::kKeyName),
-                                    iceberg::int32()),
-          SchemaField::MakeRequired(120, std::string(MapType::kValueName),
-                                    iceberg::int64())),
+      kValueCountsFieldId, "value_counts",
+      map(SchemaField::MakeRequired(119, std::string(MapType::kKeyName), 
int32()),
+          SchemaField::MakeRequired(120, std::string(MapType::kValueName), 
int64())),
       "Map of column id to total count, including null and NaN");
+
+  static constexpr int32_t kNullValueCountsFieldId = 110;
   inline static const SchemaField kNullValueCounts = SchemaField::MakeOptional(
-      110, "null_value_counts",
-      std::make_shared<MapType>(
-          SchemaField::MakeRequired(121, std::string(MapType::kKeyName),
-                                    iceberg::int32()),
-          SchemaField::MakeRequired(122, std::string(MapType::kValueName),
-                                    iceberg::int64())),
+      kNullValueCountsFieldId, "null_value_counts",
+      map(SchemaField::MakeRequired(121, std::string(MapType::kKeyName), 
int32()),
+          SchemaField::MakeRequired(122, std::string(MapType::kValueName), 
int64())),
       "Map of column id to null value count");
+
+  static constexpr int32_t kNanValueCountsFieldId = 137;
   inline static const SchemaField kNanValueCounts = SchemaField::MakeOptional(
-      137, "nan_value_counts",
-      std::make_shared<MapType>(
-          SchemaField::MakeRequired(138, std::string(MapType::kKeyName),
-                                    iceberg::int32()),
-          SchemaField::MakeRequired(139, std::string(MapType::kValueName),
-                                    iceberg::int64())),
+      kNanValueCountsFieldId, "nan_value_counts",
+      map(SchemaField::MakeRequired(138, std::string(MapType::kKeyName), 
int32()),
+          SchemaField::MakeRequired(139, std::string(MapType::kValueName), 
int64())),
       "Map of column id to number of NaN values in the column");
+
+  static constexpr int32_t kLowerBoundsFieldId = 125;
   inline static const SchemaField kLowerBounds = SchemaField::MakeOptional(
-      125, "lower_bounds",
-      std::make_shared<MapType>(
-          SchemaField::MakeRequired(126, std::string(MapType::kKeyName),
-                                    iceberg::int32()),
-          SchemaField::MakeRequired(127, std::string(MapType::kValueName),
-                                    iceberg::binary())),
+      kLowerBoundsFieldId, "lower_bounds",
+      map(SchemaField::MakeRequired(126, std::string(MapType::kKeyName), 
int32()),
+          SchemaField::MakeRequired(127, std::string(MapType::kValueName), 
binary())),
       "Map of column id to lower bound");
+
+  static constexpr int32_t kUpperBoundsFieldId = 128;
   inline static const SchemaField kUpperBounds = SchemaField::MakeOptional(
-      128, "upper_bounds",
-      std::make_shared<MapType>(
-          SchemaField::MakeRequired(129, std::string(MapType::kKeyName),
-                                    iceberg::int32()),
-          SchemaField::MakeRequired(130, std::string(MapType::kValueName),
-                                    iceberg::binary())),
+      kUpperBoundsFieldId, "upper_bounds",
+      map(SchemaField::MakeRequired(129, std::string(MapType::kKeyName), 
int32()),
+          SchemaField::MakeRequired(130, std::string(MapType::kValueName), 
binary())),
       "Map of column id to upper bound");
+
+  static constexpr int32_t kKeyMetadataFieldId = 131;
   inline static const SchemaField kKeyMetadata = SchemaField::MakeOptional(
-      131, "key_metadata", iceberg::binary(), "Encryption key metadata blob");
+      kKeyMetadataFieldId, "key_metadata", binary(), "Encryption key metadata 
blob");
+
+  static constexpr int32_t kSplitOffsetsFieldId = 132;
   inline static const SchemaField kSplitOffsets = SchemaField::MakeOptional(
-      132, "split_offsets",
-      std::make_shared<ListType>(SchemaField::MakeRequired(
-          133, std::string(ListType::kElementName), iceberg::int64())),
+      kSplitOffsetsFieldId, "split_offsets",
+      list(SchemaField::MakeRequired(133, std::string(ListType::kElementName), 
int64())),
       "Splittable offsets");
+
+  static constexpr int32_t kEqualityIdsFieldId = 135;
   inline static const SchemaField kEqualityIds = SchemaField::MakeOptional(
-      135, "equality_ids",
-      std::make_shared<ListType>(SchemaField::MakeRequired(
-          136, std::string(ListType::kElementName), iceberg::int32())),
+      kEqualityIdsFieldId, "equality_ids",
+      list(SchemaField::MakeRequired(136, std::string(ListType::kElementName), 
int32())),
       "Equality comparison field IDs");
-  inline static const SchemaField kSortOrderId =
-      SchemaField::MakeOptional(140, "sort_order_id", iceberg::int32(), "Sort 
order ID");
-  inline static const SchemaField kFirstRowId = SchemaField::MakeOptional(
-      142, "first_row_id", iceberg::int64(), "Starting row ID to assign to new 
rows");
+
+  static constexpr int32_t kSortOrderIdFieldId = 140;
+  inline static const SchemaField kSortOrderId = SchemaField::MakeOptional(
+      kSortOrderIdFieldId, "sort_order_id", int32(), "Sort order ID");
+
+  static constexpr int32_t kFirstRowIdFieldId = 142;
+  inline static const SchemaField kFirstRowId =
+      SchemaField::MakeOptional(kFirstRowIdFieldId, "first_row_id", int64(),
+                                "Starting row ID to assign to new rows");
+
+  static constexpr int32_t kReferencedDataFileFieldId = 143;
   inline static const SchemaField kReferencedDataFile = 
SchemaField::MakeOptional(
-      143, "referenced_data_file", iceberg::string(),
+      kReferencedDataFileFieldId, "referenced_data_file", string(),
       "Fully qualified location (URI with FS scheme) of a data file that all 
deletes "
       "reference");
+
+  static constexpr int32_t kContentOffsetFieldId = 144;
   inline static const SchemaField kContentOffset =
-      SchemaField::MakeOptional(144, "content_offset", iceberg::int64(),
+      SchemaField::MakeOptional(kContentOffsetFieldId, "content_offset", 
int64(),
                                 "The offset in the file where the content 
starts");
+
+  static constexpr int32_t kContentSizeFieldId = 145;
   inline static const SchemaField kContentSize =
-      SchemaField::MakeOptional(145, "content_size_in_bytes", iceberg::int64(),
+      SchemaField::MakeOptional(kContentSizeFieldId, "content_size_in_bytes", 
int64(),
                                 "The length of referenced content stored in 
the file");
 
   bool operator==(const DataFile& other) const = default;
@@ -298,16 +318,24 @@ struct ICEBERG_EXPORT ManifestEntry {
   /// File path, partition tuple, metrics, ...
   std::shared_ptr<DataFile> data_file;
 
+  static constexpr int32_t kStatusFieldId = 0;
   inline static const SchemaField kStatus =
-      SchemaField::MakeRequired(0, "status", iceberg::int32());
+      SchemaField::MakeRequired(kStatusFieldId, "status", int32());
+
+  static constexpr int32_t kSnapshotIdFieldId = 1;
   inline static const SchemaField kSnapshotId =
-      SchemaField::MakeOptional(1, "snapshot_id", iceberg::int64());
-  inline static const int32_t kDataFileFieldId = 2;
+      SchemaField::MakeOptional(kSnapshotIdFieldId, "snapshot_id", int64());
+
+  static constexpr int32_t kDataFileFieldId = 2;
   inline static const std::string kDataFileField = "data_file";
+
+  static constexpr int32_t kSequenceNumberFieldId = 3;
   inline static const SchemaField kSequenceNumber =
-      SchemaField::MakeOptional(3, "sequence_number", iceberg::int64());
-  inline static const SchemaField kFileSequenceNumber =
-      SchemaField::MakeOptional(4, "file_sequence_number", iceberg::int64());
+      SchemaField::MakeOptional(kSequenceNumberFieldId, "sequence_number", 
int64());
+
+  static constexpr int32_t kFileSequenceNumberFieldId = 4;
+  inline static const SchemaField kFileSequenceNumber = 
SchemaField::MakeOptional(
+      kFileSequenceNumberFieldId, "file_sequence_number", int64());
 
   /// \brief Check if this manifest entry is deleted.
   constexpr bool IsAlive() const {
diff --git a/src/iceberg/manifest/manifest_list.cc 
b/src/iceberg/manifest/manifest_list.cc
index 4351081c..9719b831 100644
--- a/src/iceberg/manifest/manifest_list.cc
+++ b/src/iceberg/manifest/manifest_list.cc
@@ -19,22 +19,24 @@
 
 #include "iceberg/manifest/manifest_list.h"
 
-#include "iceberg/schema.h"
+#include <memory>
+
+#include "iceberg/type.h"
 
 namespace iceberg {
 
-const StructType& PartitionFieldSummary::Type() {
-  static const StructType kInstance{{
+const std::shared_ptr<StructType>& PartitionFieldSummary::Type() {
+  static const auto kInstance = 
std::make_shared<StructType>(std::vector<SchemaField>{
       PartitionFieldSummary::kContainsNull,
       PartitionFieldSummary::kContainsNaN,
       PartitionFieldSummary::kLowerBound,
       PartitionFieldSummary::kUpperBound,
-  }};
+  });
   return kInstance;
 }
 
-const std::shared_ptr<Schema>& ManifestFile::Type() {
-  static const auto kInstance = 
std::make_shared<Schema>(std::vector<SchemaField>{
+const std::shared_ptr<StructType>& ManifestFile::Type() {
+  static const auto kInstance = 
std::make_shared<StructType>(std::vector<SchemaField>{
       kManifestPath,
       kManifestLength,
       kPartitionSpecId,
diff --git a/src/iceberg/manifest/manifest_list.h 
b/src/iceberg/manifest/manifest_list.h
index 85c0b89e..47a7ad48 100644
--- a/src/iceberg/manifest/manifest_list.h
+++ b/src/iceberg/manifest/manifest_list.h
@@ -69,7 +69,7 @@ struct ICEBERG_EXPORT PartitionFieldSummary {
 
   bool operator==(const PartitionFieldSummary& other) const = default;
 
-  static const StructType& Type();
+  static const std::shared_ptr<StructType>& Type();
 };
 
 /// \brief The type of files tracked by the manifest, either data or delete 
files; 0 for
@@ -153,51 +153,86 @@ struct ICEBERG_EXPORT ManifestFile {
   /// \brief Checks if this manifest file contains entries with DELETED status
   bool has_deleted_files() const { return deleted_files_count.value_or(1) > 0; 
}
 
+  static constexpr int32_t kManifestPathFieldId = 500;
   inline static const SchemaField kManifestPath = SchemaField::MakeRequired(
-      500, "manifest_path", iceberg::string(), "Location URI with FS scheme");
+      kManifestPathFieldId, "manifest_path", string(), "Location URI with FS 
scheme");
+
+  static constexpr int32_t kManifestLengthFieldId = 501;
   inline static const SchemaField kManifestLength = SchemaField::MakeRequired(
-      501, "manifest_length", iceberg::int64(), "Total file size in bytes");
+      kManifestLengthFieldId, "manifest_length", int64(), "Total file size in 
bytes");
+
+  static constexpr int32_t kPartitionSpecIdFieldId = 502;
   inline static const SchemaField kPartitionSpecId = SchemaField::MakeRequired(
-      502, "partition_spec_id", iceberg::int32(), "Spec ID used to write");
+      kPartitionSpecIdFieldId, "partition_spec_id", int32(), "Spec ID used to 
write");
+
+  static constexpr int32_t kContentFieldId = 517;
   inline static const SchemaField kContent = SchemaField::MakeOptional(
-      517, "content", iceberg::int32(), "Contents of the manifest: 0=data, 
1=deletes");
+      kContentFieldId, "content", int32(), "Contents of the manifest: 0=data, 
1=deletes");
+
+  static constexpr int32_t kSequenceNumberFieldId = 515;
   inline static const SchemaField kSequenceNumber =
-      SchemaField::MakeOptional(515, "sequence_number", iceberg::int64(),
+      SchemaField::MakeOptional(kSequenceNumberFieldId, "sequence_number", 
int64(),
                                 "Sequence number when the manifest was added");
+
+  static constexpr int32_t kMinSequenceNumberFieldId = 516;
   inline static const SchemaField kMinSequenceNumber =
-      SchemaField::MakeOptional(516, "min_sequence_number", iceberg::int64(),
+      SchemaField::MakeOptional(kMinSequenceNumberFieldId, 
"min_sequence_number", int64(),
                                 "Lowest sequence number in the manifest");
-  inline static const SchemaField kAddedSnapshotId = SchemaField::MakeRequired(
-      503, "added_snapshot_id", iceberg::int64(), "Snapshot ID that added the 
manifest");
+
+  static constexpr int32_t kAddedSnapshotIdFieldId = 503;
+  inline static const SchemaField kAddedSnapshotId =
+      SchemaField::MakeRequired(kAddedSnapshotIdFieldId, "added_snapshot_id", 
int64(),
+                                "Snapshot ID that added the manifest");
+
+  static constexpr int32_t kAddedFilesCountFieldId = 504;
   inline static const SchemaField kAddedFilesCount = SchemaField::MakeOptional(
-      504, "added_files_count", iceberg::int32(), "Added entry count");
-  inline static const SchemaField kExistingFilesCount = 
SchemaField::MakeOptional(
-      505, "existing_files_count", iceberg::int32(), "Existing entry count");
+      kAddedFilesCountFieldId, "added_files_count", int32(), "Added entry 
count");
+
+  static constexpr int32_t kExistingFilesCountFieldId = 505;
+  inline static const SchemaField kExistingFilesCount =
+      SchemaField::MakeOptional(kExistingFilesCountFieldId, 
"existing_files_count",
+                                int32(), "Existing entry count");
+
+  static constexpr int32_t kDeletedFilesCountFieldId = 506;
   inline static const SchemaField kDeletedFilesCount = 
SchemaField::MakeOptional(
-      506, "deleted_files_count", iceberg::int32(), "Deleted entry count");
+      kDeletedFilesCountFieldId, "deleted_files_count", int32(), "Deleted 
entry count");
+
+  static constexpr int32_t kAddedRowsCountFieldId = 512;
   inline static const SchemaField kAddedRowsCount = SchemaField::MakeOptional(
-      512, "added_rows_count", iceberg::int64(), "Added rows count");
+      kAddedRowsCountFieldId, "added_rows_count", int64(), "Added rows count");
+
+  static constexpr int32_t kExistingRowsCountFieldId = 513;
   inline static const SchemaField kExistingRowsCount = 
SchemaField::MakeOptional(
-      513, "existing_rows_count", iceberg::int64(), "Existing rows count");
+      kExistingRowsCountFieldId, "existing_rows_count", int64(), "Existing 
rows count");
+
+  static constexpr int32_t kDeletedRowsCountFieldId = 514;
   inline static const SchemaField kDeletedRowsCount = 
SchemaField::MakeOptional(
-      514, "deleted_rows_count", iceberg::int64(), "Deleted rows count");
+      kDeletedRowsCountFieldId, "deleted_rows_count", int64(), "Deleted rows 
count");
+
+  static constexpr int32_t kPartitionSummaryFieldId = 507;
   inline static const SchemaField kPartitions = SchemaField::MakeOptional(
-      507, "partitions",
-      std::make_shared<ListType>(SchemaField::MakeRequired(
-          508, std::string(ListType::kElementName),
-          struct_(
-              {PartitionFieldSummary::kContainsNull, 
PartitionFieldSummary::kContainsNaN,
-               PartitionFieldSummary::kLowerBound, 
PartitionFieldSummary::kUpperBound}))),
+      kPartitionSummaryFieldId, "partitions",
+      list(SchemaField::MakeRequired(508, std::string(ListType::kElementName),
+                                     struct_({
+                                         PartitionFieldSummary::kContainsNull,
+                                         PartitionFieldSummary::kContainsNaN,
+                                         PartitionFieldSummary::kLowerBound,
+                                         PartitionFieldSummary::kUpperBound,
+                                     }))),
       "Summary for each partition");
+
+  static constexpr int32_t kKeyMetadataFieldId = 519;
   inline static const SchemaField kKeyMetadata = SchemaField::MakeOptional(
-      519, "key_metadata", iceberg::binary(), "Encryption key metadata blob");
+      kKeyMetadataFieldId, "key_metadata", binary(), "Encryption key metadata 
blob");
+
+  static constexpr int32_t kFirstRowIdFieldId = 520;
   inline static const SchemaField kFirstRowId = SchemaField::MakeOptional(
-      520, "first_row_id", iceberg::int64(),
+      kFirstRowIdFieldId, "first_row_id", int64(),
       "Starting row ID to assign to new rows in ADDED data files");
 
   bool operator==(const ManifestFile& other) const = default;
 
-  static const std::shared_ptr<Schema>& Type();
+  static const std::shared_ptr<StructType>& Type();
 };
 
 /// Snapshots are embedded in table metadata, but the list of manifests for a 
snapshot are
diff --git a/src/iceberg/manifest/manifest_reader.cc 
b/src/iceberg/manifest/manifest_reader.cc
index 1750151e..2ba377fa 100644
--- a/src/iceberg/manifest/manifest_reader.cc
+++ b/src/iceberg/manifest/manifest_reader.cc
@@ -19,61 +19,873 @@
 
 #include "iceberg/manifest/manifest_reader.h"
 
+#include <algorithm>
+#include <memory>
+#include <unordered_set>
+
+#include <nanoarrow/nanoarrow.h>
+
+#include "iceberg/arrow/nanoarrow_status_internal.h"
+#include "iceberg/arrow_c_data_guard_internal.h"
+#include "iceberg/expression/expression.h"
+#include "iceberg/expression/projections.h"
+#include "iceberg/file_format.h"
 #include "iceberg/manifest/manifest_entry.h"
 #include "iceberg/manifest/manifest_list.h"
 #include "iceberg/manifest/manifest_reader_internal.h"
+#include "iceberg/metadata_columns.h"
+#include "iceberg/partition_spec.h"
 #include "iceberg/schema.h"
-#include "iceberg/schema_internal.h"
+#include "iceberg/schema_field.h"
 #include "iceberg/type.h"
+#include "iceberg/util/checked_cast.h"
 #include "iceberg/util/macros.h"
 
 namespace iceberg {
 
+namespace {
+
+// TODO(gangwu): refactor these macros with template functions.
+#define PARSE_PRIMITIVE_FIELD(item, array_view, type)                          
         \
+  for (int64_t row_idx = 0; row_idx < array_view->length; row_idx++) {         
         \
+    if (!ArrowArrayViewIsNull(array_view, row_idx)) {                          
         \
+      auto value = ArrowArrayViewGetIntUnsafe(array_view, row_idx);            
         \
+      item = static_cast<type>(value);                                         
         \
+    } else if (required) {                                                     
         \
+      return InvalidManifestList("Field {} is required but null at row {}", 
field_name, \
+                                 row_idx);                                     
         \
+    }                                                                          
         \
+  }
+
+#define PARSE_STRING_FIELD(item, array_view)                                   
         \
+  for (int64_t row_idx = 0; row_idx < array_view->length; row_idx++) {         
         \
+    if (!ArrowArrayViewIsNull(array_view, row_idx)) {                          
         \
+      auto value = ArrowArrayViewGetStringUnsafe(array_view, row_idx);         
         \
+      item = std::string(value.data, value.size_bytes);                        
         \
+    } else if (required) {                                                     
         \
+      return InvalidManifestList("Field {} is required but null at row {}", 
field_name, \
+                                 row_idx);                                     
         \
+    }                                                                          
         \
+  }
+
+#define PARSE_BINARY_FIELD(item, array_view)                                   
         \
+  for (int64_t row_idx = 0; row_idx < array_view->length; row_idx++) {         
         \
+    if (!ArrowArrayViewIsNull(view_of_column, row_idx)) {                      
         \
+      item = ArrowArrayViewGetInt8Vector(array_view, row_idx);                 
         \
+    } else if (required) {                                                     
         \
+      return InvalidManifestList("Field {} is required but null at row {}", 
field_name, \
+                                 row_idx);                                     
         \
+    }                                                                          
         \
+  }
+
+#define PARSE_INTEGER_VECTOR_FIELD(item, count, array_view, type)              
     \
+  for (int64_t manifest_idx = 0; manifest_idx < count; manifest_idx++) {       
     \
+    auto offset = ArrowArrayViewListChildOffset(array_view, manifest_idx);     
     \
+    auto next_offset = ArrowArrayViewListChildOffset(array_view, manifest_idx 
+ 1); \
+    for (int64_t offset_idx = offset; offset_idx < next_offset; offset_idx++) 
{     \
+      item.emplace_back(static_cast<type>(                                     
     \
+          ArrowArrayViewGetIntUnsafe(array_view->children[0], offset_idx)));   
     \
+    }                                                                          
     \
+  }
+
+#define PARSE_MAP_FIELD(item, count, array_view, key_type, value_type, 
assignment)   \
+  do {                                                                         
      \
+    if (array_view->storage_type != ArrowType::NANOARROW_TYPE_MAP) {           
      \
+      return InvalidManifest("Field:{} should be a map.", field_name);         
      \
+    }                                                                          
      \
+    auto view_of_map = array_view->children[0];                                
      \
+    ASSERT_VIEW_TYPE_AND_CHILDREN(view_of_map, 
ArrowType::NANOARROW_TYPE_STRUCT, 2); \
+    auto view_of_map_key = view_of_map->children[0];                           
      \
+    ASSERT_VIEW_TYPE(view_of_map_key, key_type);                               
      \
+    auto view_of_map_value = view_of_map->children[1];                         
      \
+    ASSERT_VIEW_TYPE(view_of_map_value, value_type);                           
      \
+    for (int64_t row_idx = 0; row_idx < count; row_idx++) {                    
      \
+      auto offset = array_view->buffer_views[1].data.as_int32[row_idx];        
      \
+      auto next_offset = array_view->buffer_views[1].data.as_int32[row_idx + 
1];     \
+      for (int32_t offset_idx = offset; offset_idx < next_offset; 
offset_idx++) {    \
+        auto key = ArrowArrayViewGetIntUnsafe(view_of_map_key, offset_idx);    
      \
+        item[key] = assignment;                                                
      \
+      }                                                                        
      \
+    }                                                                          
      \
+  } while (0)
+
+#define PARSE_INT_LONG_MAP_FIELD(item, count, array_view)                   \
+  PARSE_MAP_FIELD(item, count, array_view, ArrowType::NANOARROW_TYPE_INT32, \
+                  ArrowType::NANOARROW_TYPE_INT64,                          \
+                  ArrowArrayViewGetIntUnsafe(view_of_map_value, offset_idx));
+
+#define PARSE_INT_BINARY_MAP_FIELD(item, count, array_view)                 \
+  PARSE_MAP_FIELD(item, count, array_view, ArrowType::NANOARROW_TYPE_INT32, \
+                  ArrowType::NANOARROW_TYPE_BINARY,                         \
+                  ArrowArrayViewGetInt8Vector(view_of_map_value, offset_idx));
+
+#define ASSERT_VIEW_TYPE(view, type)                                           
\
+  if (view->storage_type != type) {                                            
\
+    return InvalidManifest("Sub Field:{} should be a {}.", field_name, #type); 
\
+  }
+
+#define ASSERT_VIEW_TYPE_AND_CHILDREN(view, type, n_child)                     
  \
+  if (view->storage_type != type) {                                            
  \
+    return InvalidManifest("Sub Field:{} should be a {}.", field_name, #type); 
  \
+  }                                                                            
  \
+  if (view->n_children != n_child) {                                           
  \
+    return InvalidManifest("Sub Field for:{} should have key&value:{} 
columns.", \
+                           field_name, n_child);                               
  \
+  }
+
+std::vector<uint8_t> ArrowArrayViewGetInt8Vector(const ArrowArrayView* view,
+                                                 int32_t offset_idx) {
+  auto buffer = ArrowArrayViewGetBytesUnsafe(view, offset_idx);
+  return {buffer.data.as_char, buffer.data.as_char + buffer.size_bytes};
+}
+
+Status ParsePartitionFieldSummaryList(ArrowArrayView* view_of_column,
+                                      std::vector<ManifestFile>& 
manifest_files) {
+  auto manifest_count = view_of_column->length;
+  // view_of_column is list<struct<PartitionFieldSummary>>
+  if (view_of_column->storage_type != ArrowType::NANOARROW_TYPE_LIST) {
+    return InvalidManifestList("partitions field should be a list.");
+  }
+  auto view_of_list_item = view_of_column->children[0];
+  // view_of_list_item is struct<PartitionFieldSummary>
+  if (view_of_list_item->storage_type != ArrowType::NANOARROW_TYPE_STRUCT) {
+    return InvalidManifestList("partitions list item should be a struct.");
+  }
+  if (view_of_list_item->n_children != 4) {
+    return InvalidManifestList("PartitionFieldSummary should have 4 fields.");
+  }
+  if (view_of_list_item->children[0]->storage_type != 
ArrowType::NANOARROW_TYPE_BOOL) {
+    return InvalidManifestList("contains_null should have be bool type 
column.");
+  }
+  auto contains_null = view_of_list_item->children[0];
+  if (view_of_list_item->children[1]->storage_type != 
ArrowType::NANOARROW_TYPE_BOOL) {
+    return InvalidManifestList("contains_nan should have be bool type 
column.");
+  }
+  auto contains_nan = view_of_list_item->children[1];
+  if (view_of_list_item->children[2]->storage_type != 
ArrowType::NANOARROW_TYPE_BINARY) {
+    return InvalidManifestList("lower_bound should have be binary type 
column.");
+  }
+  auto lower_bound_list = view_of_list_item->children[2];
+  if (view_of_list_item->children[3]->storage_type != 
ArrowType::NANOARROW_TYPE_BINARY) {
+    return InvalidManifestList("upper_bound should have be binary type 
column.");
+  }
+  auto upper_bound_list = view_of_list_item->children[3];
+  for (int64_t manifest_idx = 0; manifest_idx < manifest_count; 
manifest_idx++) {
+    auto offset = ArrowArrayViewListChildOffset(view_of_column, manifest_idx);
+    auto next_offset = ArrowArrayViewListChildOffset(view_of_column, 
manifest_idx + 1);
+    // partitions from offset to next_offset belongs to manifest_idx
+    auto& manifest_file = manifest_files[manifest_idx];
+    for (int64_t partition_idx = offset; partition_idx < next_offset; 
partition_idx++) {
+      PartitionFieldSummary partition_field_summary;
+      if (!ArrowArrayViewIsNull(contains_null, partition_idx)) {
+        partition_field_summary.contains_null =
+            ArrowArrayViewGetIntUnsafe(contains_null, partition_idx);
+      } else {
+        return InvalidManifestList("contains_null is null at row {}", 
partition_idx);
+      }
+      if (!ArrowArrayViewIsNull(contains_nan, partition_idx)) {
+        partition_field_summary.contains_nan =
+            ArrowArrayViewGetIntUnsafe(contains_nan, partition_idx);
+      }
+      if (!ArrowArrayViewIsNull(lower_bound_list, partition_idx)) {
+        partition_field_summary.lower_bound =
+            ArrowArrayViewGetInt8Vector(lower_bound_list, partition_idx);
+      }
+      if (!ArrowArrayViewIsNull(upper_bound_list, partition_idx)) {
+        partition_field_summary.upper_bound =
+            ArrowArrayViewGetInt8Vector(upper_bound_list, partition_idx);
+      }
+
+      manifest_file.partitions.emplace_back(partition_field_summary);
+    }
+  }
+  return {};
+}
+
+Result<std::vector<ManifestFile>> ParseManifestList(ArrowSchema* schema,
+                                                    ArrowArray* array_in,
+                                                    const Schema& 
iceberg_schema) {
+  if (schema->n_children != array_in->n_children) {
+    return InvalidManifestList("Columns size not match between schema:{} and 
array:{}",
+                               schema->n_children, array_in->n_children);
+  }
+  if (iceberg_schema.fields().size() != array_in->n_children) {
+    return InvalidManifestList("Columns size not match between schema:{} and 
array:{}",
+                               iceberg_schema.fields().size(), 
array_in->n_children);
+  }
+
+  ArrowError error;
+  ArrowArrayView array_view;
+  auto status = ArrowArrayViewInitFromSchema(&array_view, schema, &error);
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(status, error);
+  internal::ArrowArrayViewGuard view_guard(&array_view);
+  status = ArrowArrayViewSetArray(&array_view, array_in, &error);
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(status, error);
+  status = ArrowArrayViewValidate(&array_view, 
NANOARROW_VALIDATION_LEVEL_FULL, &error);
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(status, error);
+
+  std::vector<ManifestFile> manifest_files;
+  manifest_files.resize(array_in->length);
+
+  for (int64_t idx = 0; idx < array_in->n_children; idx++) {
+    ICEBERG_ASSIGN_OR_RAISE(auto field, iceberg_schema.GetFieldByIndex(idx));
+    ICEBERG_CHECK(field.has_value(), "Invalid index {} for data file schema", 
idx);
+    auto field_name = field->get().name();
+    auto field_id = field->get().field_id();
+    auto required = !field->get().optional();
+    auto view_of_column = array_view.children[idx];
+    switch (field_id) {
+      case ManifestFile::kManifestPathFieldId:
+        PARSE_STRING_FIELD(manifest_files[row_idx].manifest_path, 
view_of_column);
+        break;
+      case ManifestFile::kManifestLengthFieldId:
+        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].manifest_length, 
view_of_column,
+                              int64_t);
+        break;
+      case ManifestFile::kPartitionSpecIdFieldId:
+        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].partition_spec_id, 
view_of_column,
+                              int32_t);
+        break;
+      case ManifestFile::kContentFieldId:
+        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].content, view_of_column,
+                              ManifestContent);
+        break;
+      case ManifestFile::kSequenceNumberFieldId:
+        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].sequence_number, 
view_of_column,
+                              int64_t);
+        break;
+      case ManifestFile::kMinSequenceNumberFieldId:
+        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].min_sequence_number, 
view_of_column,
+                              int64_t);
+        break;
+      case ManifestFile::kAddedSnapshotIdFieldId:
+        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_snapshot_id, 
view_of_column,
+                              int64_t);
+        break;
+      case ManifestFile::kAddedFilesCountFieldId:
+        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_files_count, 
view_of_column,
+                              int32_t);
+        break;
+      case ManifestFile::kExistingFilesCountFieldId:
+        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].existing_files_count,
+                              view_of_column, int32_t);
+        break;
+      case ManifestFile::kDeletedFilesCountFieldId:
+        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].deleted_files_count, 
view_of_column,
+                              int32_t);
+        break;
+      case ManifestFile::kAddedRowsCountFieldId:
+        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_rows_count, 
view_of_column,
+                              int64_t);
+        break;
+      case ManifestFile::kExistingRowsCountFieldId:
+        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].existing_rows_count, 
view_of_column,
+                              int64_t);
+        break;
+      case ManifestFile::kDeletedRowsCountFieldId:
+        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].deleted_rows_count, 
view_of_column,
+                              int64_t);
+        break;
+      case ManifestFile::kPartitionSummaryFieldId:
+        ICEBERG_RETURN_UNEXPECTED(
+            ParsePartitionFieldSummaryList(view_of_column, manifest_files));
+        break;
+      case ManifestFile::kKeyMetadataFieldId:
+        PARSE_BINARY_FIELD(manifest_files[row_idx].key_metadata, 
view_of_column);
+        break;
+      case ManifestFile::kFirstRowIdFieldId:
+        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].first_row_id, 
view_of_column,
+                              int64_t);
+        break;
+      default:
+        return InvalidManifestList("Unsupported field: {} in manifest file.", 
field_name);
+    }
+  }
+  return manifest_files;
+}
+
+Status ParsePartitionValues(ArrowArrayView* view_of_partition, int64_t row_idx,
+                            std::vector<ManifestEntry>& manifest_entries) {
+  switch (view_of_partition->storage_type) {
+    case ArrowType::NANOARROW_TYPE_BOOL: {
+      auto value = ArrowArrayViewGetUIntUnsafe(view_of_partition, row_idx);
+      manifest_entries[row_idx].data_file->partition.AddValue(
+          Literal::Boolean(value != 0));
+    } break;
+    case ArrowType::NANOARROW_TYPE_INT32: {
+      auto value = ArrowArrayViewGetIntUnsafe(view_of_partition, row_idx);
+      
manifest_entries[row_idx].data_file->partition.AddValue(Literal::Int(value));
+    } break;
+    case ArrowType::NANOARROW_TYPE_INT64: {
+      auto value = ArrowArrayViewGetIntUnsafe(view_of_partition, row_idx);
+      
manifest_entries[row_idx].data_file->partition.AddValue(Literal::Long(value));
+    } break;
+    case ArrowType::NANOARROW_TYPE_FLOAT: {
+      auto value = ArrowArrayViewGetDoubleUnsafe(view_of_partition, row_idx);
+      
manifest_entries[row_idx].data_file->partition.AddValue(Literal::Float(value));
+    } break;
+    case ArrowType::NANOARROW_TYPE_DOUBLE: {
+      auto value = ArrowArrayViewGetDoubleUnsafe(view_of_partition, row_idx);
+      
manifest_entries[row_idx].data_file->partition.AddValue(Literal::Double(value));
+    } break;
+    case ArrowType::NANOARROW_TYPE_STRING: {
+      auto value = ArrowArrayViewGetStringUnsafe(view_of_partition, row_idx);
+      manifest_entries[row_idx].data_file->partition.AddValue(
+          Literal::String(std::string(value.data, value.size_bytes)));
+    } break;
+    case ArrowType::NANOARROW_TYPE_BINARY: {
+      auto buffer = ArrowArrayViewGetBytesUnsafe(view_of_partition, row_idx);
+      manifest_entries[row_idx].data_file->partition.AddValue(
+          Literal::Binary(std::vector<uint8_t>(buffer.data.as_char,
+                                               buffer.data.as_char + 
buffer.size_bytes)));
+    } break;
+    default:
+      return InvalidManifest("Unsupported field type: {} in data file 
partition.",
+                             
static_cast<int32_t>(view_of_partition->storage_type));
+  }
+  return {};
+}
+
+Status ParseDataFile(const std::shared_ptr<StructType>& data_file_schema,
+                     ArrowArrayView* view_of_column, std::optional<int64_t>& 
first_row_id,
+                     std::vector<ManifestEntry>& manifest_entries) {
+  if (view_of_column->storage_type != ArrowType::NANOARROW_TYPE_STRUCT) {
+    return InvalidManifest("DataFile field should be a struct.");
+  }
+  if (view_of_column->n_children != data_file_schema->fields().size()) {
+    return InvalidManifest("DataFile schema size:{} not match with ArrayArray 
columns:{}",
+                           data_file_schema->fields().size(), 
view_of_column->n_children);
+  }
+  for (int64_t col_idx = 0; col_idx < view_of_column->n_children; ++col_idx) {
+    ICEBERG_ASSIGN_OR_RAISE(auto field, 
data_file_schema->GetFieldByIndex(col_idx));
+    ICEBERG_CHECK(field.has_value(), "Invalid index {} for data file schema", 
col_idx);
+    auto field_name = field->get().name();
+    auto field_id = field->get().field_id();
+    auto required = !field->get().optional();
+    auto array_view = view_of_column->children[col_idx];
+    auto manifest_entry_count = array_view->length;
+
+    switch (field_id) {
+      case DataFile::kContentFieldId:
+        PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->content, 
array_view,
+                              DataFile::Content);
+        break;
+      case DataFile::kFilePathFieldId:
+        PARSE_STRING_FIELD(manifest_entries[row_idx].data_file->file_path, 
array_view);
+        break;
+      case DataFile::kFileFormatFieldId:
+        for (int64_t row_idx = 0; row_idx < array_view->length; row_idx++) {
+          if (!ArrowArrayViewIsNull(array_view, row_idx)) {
+            auto value = ArrowArrayViewGetStringUnsafe(array_view, row_idx);
+            std::string_view path_str(value.data, value.size_bytes);
+            
ICEBERG_ASSIGN_OR_RAISE(manifest_entries[row_idx].data_file->file_format,
+                                    FileFormatTypeFromString(path_str));
+          }
+        }
+        break;
+      case DataFile::kPartitionFieldId: {
+        if (array_view->storage_type != ArrowType::NANOARROW_TYPE_STRUCT) {
+          return InvalidManifest("Field:{} should be a struct.", field_name);
+        }
+        if (array_view->n_children > 0) {
+          for (int64_t partition_idx = 0; partition_idx < 
array_view->n_children;
+               partition_idx++) {
+            auto view_of_partition = array_view->children[partition_idx];
+            for (int64_t row_idx = 0; row_idx < view_of_partition->length; 
row_idx++) {
+              if (ArrowArrayViewIsNull(view_of_partition, row_idx)) {
+                break;
+              }
+              ICEBERG_RETURN_UNEXPECTED(
+                  ParsePartitionValues(view_of_partition, row_idx, 
manifest_entries));
+            }
+          }
+        }
+      } break;
+      case DataFile::kRecordCountFieldId:
+        
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->record_count,
+                              array_view, int64_t);
+        break;
+      case DataFile::kFileSizeFieldId:
+        
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->file_size_in_bytes,
+                              array_view, int64_t);
+        break;
+      case DataFile::kColumnSizesFieldId:
+        // key&value should have the same offset
+        // HACK(xiao.dong) workaround for arrow bug:
+        // ArrowArrayViewListChildOffset can not get the correct offset for map
+        
PARSE_INT_LONG_MAP_FIELD(manifest_entries[row_idx].data_file->column_sizes,
+                                 manifest_entry_count, array_view);
+        break;
+      case DataFile::kValueCountsFieldId:
+        
PARSE_INT_LONG_MAP_FIELD(manifest_entries[row_idx].data_file->value_counts,
+                                 manifest_entry_count, array_view);
+        break;
+      case DataFile::kNullValueCountsFieldId:
+        
PARSE_INT_LONG_MAP_FIELD(manifest_entries[row_idx].data_file->null_value_counts,
+                                 manifest_entry_count, array_view);
+        break;
+      case DataFile::kNanValueCountsFieldId:
+        
PARSE_INT_LONG_MAP_FIELD(manifest_entries[row_idx].data_file->nan_value_counts,
+                                 manifest_entry_count, array_view);
+        break;
+      case DataFile::kLowerBoundsFieldId:
+        
PARSE_INT_BINARY_MAP_FIELD(manifest_entries[row_idx].data_file->lower_bounds,
+                                   manifest_entry_count, array_view);
+        break;
+      case DataFile::kUpperBoundsFieldId:
+        
PARSE_INT_BINARY_MAP_FIELD(manifest_entries[row_idx].data_file->upper_bounds,
+                                   manifest_entry_count, array_view);
+        break;
+      case DataFile::kKeyMetadataFieldId:
+        PARSE_BINARY_FIELD(manifest_entries[row_idx].data_file->key_metadata, 
array_view);
+        break;
+      case DataFile::kSplitOffsetsFieldId:
+        PARSE_INTEGER_VECTOR_FIELD(
+            manifest_entries[manifest_idx].data_file->split_offsets, 
manifest_entry_count,
+            array_view, int64_t);
+        break;
+      case DataFile::kEqualityIdsFieldId:
+        
PARSE_INTEGER_VECTOR_FIELD(manifest_entries[manifest_idx].data_file->equality_ids,
+                                   manifest_entry_count, array_view, int32_t);
+        break;
+      case DataFile::kSortOrderIdFieldId:
+        
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->sort_order_id,
+                              array_view, int32_t);
+        break;
+      case DataFile::kFirstRowIdFieldId: {
+        
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->first_row_id,
+                              array_view, int64_t);
+        if (first_row_id.has_value()) {
+          std::ranges::for_each(manifest_entries, 
[&first_row_id](ManifestEntry& entry) {
+            if (entry.status != ManifestStatus::kDeleted &&
+                !entry.data_file->first_row_id.has_value()) {
+              entry.data_file->first_row_id = first_row_id.value();
+              first_row_id = first_row_id.value() + 
entry.data_file->record_count;
+            }
+          });
+        } else {
+          // data file's first_row_id is null when the manifest's first_row_id 
is null
+          std::ranges::for_each(manifest_entries, [](ManifestEntry& entry) {
+            entry.data_file->first_row_id = std::nullopt;
+          });
+        }
+        break;
+      }
+      case DataFile::kReferencedDataFileFieldId:
+        
PARSE_STRING_FIELD(manifest_entries[row_idx].data_file->referenced_data_file,
+                           array_view);
+        break;
+      case DataFile::kContentOffsetFieldId:
+        
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->content_offset,
+                              array_view, int64_t);
+        break;
+      case DataFile::kContentSizeFieldId:
+        
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->content_size_in_bytes,
+                              array_view, int64_t);
+        break;
+      default:
+        return InvalidManifest("Unsupported field: {} in data file.", 
field_name);
+    }
+  }
+  return {};
+}
+
+Result<std::vector<ManifestEntry>> ParseManifestEntry(
+    ArrowSchema* schema, ArrowArray* array_in, const Schema& iceberg_schema,
+    std::optional<int64_t>& first_row_id) {
+  if (schema->n_children != array_in->n_children) {
+    return InvalidManifest("Columns size not match between schema:{} and 
array:{}",
+                           schema->n_children, array_in->n_children);
+  }
+  if (iceberg_schema.fields().size() != array_in->n_children) {
+    return InvalidManifest("Columns size not match between schema:{} and 
array:{}",
+                           iceberg_schema.fields().size(), 
array_in->n_children);
+  }
+
+  ArrowError error;
+  ArrowArrayView array_view;
+  auto status = ArrowArrayViewInitFromSchema(&array_view, schema, &error);
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(status, error);
+  internal::ArrowArrayViewGuard view_guard(&array_view);
+  status = ArrowArrayViewSetArray(&array_view, array_in, &error);
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(status, error);
+  status = ArrowArrayViewValidate(&array_view, 
NANOARROW_VALIDATION_LEVEL_FULL, &error);
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(status, error);
+
+  std::vector<ManifestEntry> manifest_entries;
+  manifest_entries.resize(array_in->length);
+  for (int64_t i = 0; i < array_in->length; i++) {
+    manifest_entries[i].data_file = std::make_shared<DataFile>();
+  }
+
+  for (int64_t idx = 0; idx < array_in->n_children; idx++) {
+    ICEBERG_ASSIGN_OR_RAISE(auto field, iceberg_schema.GetFieldByIndex(idx));
+    ICEBERG_CHECK(field.has_value(), "Invalid index {} for manifest entry 
schema", idx);
+    auto field_name = field->get().name();
+    auto field_id = field->get().field_id();
+    auto required = !field->get().optional();
+    auto view_of_column = array_view.children[idx];
+
+    switch (field_id) {
+      case ManifestEntry::kStatusFieldId:
+        PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].status, view_of_column,
+                              ManifestStatus);
+        break;
+      case ManifestEntry::kSnapshotIdFieldId:
+        PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].snapshot_id, 
view_of_column,
+                              int64_t);
+        break;
+      case ManifestEntry::kSequenceNumberFieldId:
+        PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].sequence_number, 
view_of_column,
+                              int64_t);
+        break;
+      case ManifestEntry::kFileSequenceNumberFieldId:
+        PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].file_sequence_number,
+                              view_of_column, int64_t);
+        break;
+      case ManifestEntry::kDataFileFieldId: {
+        auto data_file_schema =
+            internal::checked_pointer_cast<StructType>(field->get().type());
+        ICEBERG_RETURN_UNEXPECTED(ParseDataFile(data_file_schema, 
view_of_column,
+                                                first_row_id, 
manifest_entries));
+        break;
+      }
+      default:
+        return InvalidManifest("Unsupported field: {} in manifest entry.", 
field_name);
+    }
+  }
+  return manifest_entries;
+}
+
+const std::vector<std::string> kStatsColumns = {"value_counts",     
"null_value_counts",
+                                                "nan_value_counts", 
"lower_bounds",
+                                                "upper_bounds",     
"record_count"};
+
+bool RequireStatsProjection(const std::shared_ptr<Expression>& row_filter,
+                            const std::vector<std::string>& columns) {
+  if (!row_filter || row_filter->op() == Expression::Operation::kTrue) {
+    return false;
+  }
+  if (columns.empty()) {
+    return false;
+  }
+  const std::unordered_set<std::string_view> selected(columns.cbegin(), 
columns.cend());
+  if (selected.contains(ManifestReader::kAllColumns)) {
+    return false;
+  }
+  if (std::ranges::all_of(kStatsColumns, [&selected](const std::string& col) {
+        return selected.contains(col);
+      })) {
+    return false;
+  }
+  return true;
+}
+
+Result<std::shared_ptr<Schema>> ProjectSchema(std::shared_ptr<Schema> schema,
+                                              const std::vector<std::string>& 
columns,
+                                              bool case_sensitive) {
+  if (!columns.empty()) {
+    return schema->Select(columns, case_sensitive);
+  }
+  return schema;
+}
+
+}  // namespace
+
+std::vector<std::string> ManifestReader::WithStatsColumns(
+    const std::vector<std::string>& columns) {
+  if (std::ranges::contains(columns, ManifestReader::kAllColumns)) {
+    return columns;
+  } else {
+    std::vector<std::string> updated_columns{columns};
+    updated_columns.insert(updated_columns.end(), kStatsColumns.begin(),
+                           kStatsColumns.end());
+    return updated_columns;
+  }
+}
+
+// ManifestReaderImpl constructor
+ManifestReaderImpl::ManifestReaderImpl(
+    std::string manifest_path, std::optional<int64_t> manifest_length,
+    std::shared_ptr<FileIO> file_io, std::shared_ptr<Schema> schema,
+    std::shared_ptr<PartitionSpec> spec,
+    std::unique_ptr<InheritableMetadata> inheritable_metadata,
+    std::optional<int64_t> first_row_id)
+    : manifest_path_(std::move(manifest_path)),
+      manifest_length_(manifest_length),
+      file_io_(std::move(file_io)),
+      schema_(std::move(schema)),
+      spec_(std::move(spec)),
+      inheritable_metadata_(std::move(inheritable_metadata)),
+      first_row_id_(first_row_id) {}
+
+ManifestReader& ManifestReaderImpl::Select(const std::vector<std::string>& 
columns) {
+  columns_ = columns;
+  return *this;
+}
+
+ManifestReader& 
ManifestReaderImpl::FilterPartitions(std::shared_ptr<Expression> expr) {
+  part_filter_ = expr ? std::move(expr) : True::Instance();
+  return *this;
+}
+
+ManifestReader& ManifestReaderImpl::FilterPartitions(
+    std::shared_ptr<PartitionSet> partition_set) {
+  partition_set_ = std::move(partition_set);
+  return *this;
+}
+
+ManifestReader& ManifestReaderImpl::FilterRows(std::shared_ptr<Expression> 
expr) {
+  row_filter_ = expr ? std::move(expr) : True::Instance();
+  return *this;
+}
+
+ManifestReader& ManifestReaderImpl::CaseSensitive(bool case_sensitive) {
+  case_sensitive_ = case_sensitive;
+  return *this;
+}
+
+bool ManifestReaderImpl::HasPartitionFilter() const {
+  ICEBERG_DCHECK(part_filter_, "Partition filter is not set");
+  return part_filter_->op() != Expression::Operation::kTrue;
+}
+
+bool ManifestReaderImpl::HasRowFilter() const {
+  ICEBERG_DCHECK(row_filter_, "Row filter is not set");
+  return row_filter_->op() != Expression::Operation::kTrue;
+}
+
+Result<Evaluator*> ManifestReaderImpl::GetEvaluator() {
+  if (!evaluator_) {
+    auto projection_evaluator = Projections::Inclusive(*spec_, *schema_, 
case_sensitive_);
+    ICEBERG_ASSIGN_OR_RAISE(auto projected, 
projection_evaluator->Project(row_filter_));
+    ICEBERG_ASSIGN_OR_RAISE(auto final_part_filter,
+                            And::Make(std::move(projected), part_filter_));
+
+    ICEBERG_ASSIGN_OR_RAISE(auto partition_type, 
spec_->PartitionType(*schema_));
+    auto partition_schema = partition_type->ToSchema();
+    ICEBERG_ASSIGN_OR_RAISE(
+        evaluator_, Evaluator::Make(*partition_schema, 
std::move(final_part_filter),
+                                    case_sensitive_));
+  }
+  return evaluator_.get();
+}
+
+Result<InclusiveMetricsEvaluator*> ManifestReaderImpl::GetMetricsEvaluator() {
+  if (!metrics_evaluator_) {
+    ICEBERG_ASSIGN_OR_RAISE(
+        metrics_evaluator_,
+        InclusiveMetricsEvaluator::Make(row_filter_, *schema_, 
case_sensitive_));
+  }
+  return metrics_evaluator_.get();
+}
+
+bool ManifestReaderImpl::InPartitionSet(const DataFile& file) const {
+  if (!partition_set_) {
+    return true;
+  }
+  return partition_set_->contains(file.partition_spec_id, file.partition);
+}
+
+Status ManifestReaderImpl::OpenReader(std::shared_ptr<Schema> projection) {
+  ICEBERG_PRECHECK(projection != nullptr, "Projection schema cannot be null");
+
+  // Ensure required fields are present in the projected data file schema.
+  std::vector<SchemaField> fields(projection->fields().begin(),
+                                  projection->fields().end());
+  ICEBERG_ASSIGN_OR_RAISE(auto record_count_field,
+                          
projection->FindFieldById(DataFile::kRecordCount.field_id()));
+  ICEBERG_ASSIGN_OR_RAISE(auto first_row_id_field,
+                          
projection->FindFieldById(DataFile::kFirstRowId.field_id()));
+  if (!record_count_field.has_value()) {
+    fields.push_back(DataFile::kRecordCount);
+  }
+  if (!first_row_id_field.has_value()) {
+    fields.push_back(DataFile::kFirstRowId);
+  }
+  /// FIXME: this is not supported yet.
+  // fields.push_back(MetadataColumns::kRowPosition);
+  auto data_file_type = std::make_shared<StructType>(std::move(fields));
+
+  // Wrap the projected data file schema into manifest entry schema
+  file_schema_ =
+      
ManifestEntry::TypeFromDataFileType(std::move(data_file_type))->ToSchema();
+
+  ReaderOptions options;
+  options.path = manifest_path_;
+  options.io = file_io_;
+  options.projection = file_schema_;
+  if (manifest_length_.has_value()) {
+    options.length = manifest_length_;
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(file_reader_,
+                          ReaderFactoryRegistry::Open(FileFormatType::kAvro, 
options));
+
+  return {};
+}
+
+Result<std::vector<ManifestEntry>> ManifestReaderImpl::Entries() {
+  return ReadEntries(/*only_live=*/false);
+}
+
+Result<std::vector<ManifestEntry>> ManifestReaderImpl::LiveEntries() {
+  return ReadEntries(/*only_live=*/true);
+}
+
+Result<std::vector<ManifestEntry>> ManifestReaderImpl::ReadEntries(bool 
only_live) {
+  ICEBERG_ASSIGN_OR_RAISE(auto partition_type, spec_->PartitionType(*schema_));
+  auto data_file_schema = 
DataFile::Type(std::move(partition_type))->ToSchema();
+
+  std::shared_ptr<Schema> projected_data_file_schema;
+  bool needs_filtering =
+      HasRowFilter() || HasPartitionFilter() || partition_set_ != nullptr;
+  if (needs_filtering) {
+    // ensure stats columns are present for metrics evaluation
+    bool requires_stats_projection = RequireStatsProjection(row_filter_, 
columns_);
+    ICEBERG_ASSIGN_OR_RAISE(
+        projected_data_file_schema,
+        ProjectSchema(
+            std::move(data_file_schema),
+            (requires_stats_projection ? 
ManifestReader::WithStatsColumns(columns_)
+                                       : columns_),
+            case_sensitive_));
+  } else {
+    ICEBERG_ASSIGN_OR_RAISE(
+        projected_data_file_schema,
+        ProjectSchema(std::move(data_file_schema), columns_, case_sensitive_));
+  }
+
+  ICEBERG_RETURN_UNEXPECTED(OpenReader(std::move(projected_data_file_schema)));
+  ICEBERG_DCHECK(file_reader_ != nullptr, "File reader should be initialized");
+
+  std::vector<ManifestEntry> manifest_entries;
+  ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, file_reader_->Schema());
+  internal::ArrowSchemaGuard schema_guard(&arrow_schema);
+
+  // Get evaluators if needed
+  Evaluator* evaluator = nullptr;
+  InclusiveMetricsEvaluator* metrics_evaluator = nullptr;
+  if (HasPartitionFilter() || HasRowFilter()) {
+    ICEBERG_ASSIGN_OR_RAISE(evaluator, GetEvaluator());
+  }
+  if (HasRowFilter()) {
+    ICEBERG_ASSIGN_OR_RAISE(metrics_evaluator, GetMetricsEvaluator());
+  }
+
+  while (true) {
+    ICEBERG_ASSIGN_OR_RAISE(auto result, file_reader_->Next());
+    if (!result.has_value()) {
+      break;  // EOF
+    }
+
+    internal::ArrowArrayGuard array_guard(&result.value());
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto entries,
+        ParseManifestEntry(&arrow_schema, &result.value(), *file_schema_, 
first_row_id_));
+
+    for (auto& entry : entries) {
+      ICEBERG_RETURN_UNEXPECTED(inheritable_metadata_->Apply(entry));
+
+      if (only_live && !entry.IsAlive()) {
+        continue;
+      }
+
+      if (needs_filtering) {
+        ICEBERG_DCHECK(entry.data_file != nullptr, "Data file cannot be null");
+        if (evaluator) {
+          ICEBERG_ASSIGN_OR_RAISE(bool partition_match,
+                                  
evaluator->Evaluate(entry.data_file->partition));
+          if (!partition_match) {
+            continue;
+          }
+        }
+        if (metrics_evaluator) {
+          ICEBERG_ASSIGN_OR_RAISE(bool metrics_match,
+                                  
metrics_evaluator->Evaluate(*entry.data_file));
+          if (!metrics_match) {
+            continue;
+          }
+        }
+        if (!InPartitionSet(*entry.data_file)) {
+          continue;
+        }
+      }
+
+      manifest_entries.push_back(std::move(entry));
+    }
+  }
+
+  return manifest_entries;
+}
+
+Result<std::vector<ManifestFile>> ManifestListReaderImpl::Files() const {
+  std::vector<ManifestFile> manifest_files;
+  ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, reader_->Schema());
+  internal::ArrowSchemaGuard schema_guard(&arrow_schema);
+  while (true) {
+    ICEBERG_ASSIGN_OR_RAISE(auto result, reader_->Next());
+    if (!result.has_value()) {
+      // eof
+      break;
+    }
+    internal::ArrowArrayGuard array_guard(&result.value());
+    ICEBERG_ASSIGN_OR_RAISE(auto parse_result,
+                            ParseManifestList(&arrow_schema, &result.value(), 
*schema_));
+    manifest_files.insert(manifest_files.end(),
+                          std::make_move_iterator(parse_result.begin()),
+                          std::make_move_iterator(parse_result.end()));
+  }
+  return manifest_files;
+}
+
+Result<std::unordered_map<std::string, std::string>> 
ManifestListReaderImpl::Metadata()
+    const {
+  return reader_->Metadata();
+}
+
+Result<ManifestFileField> ManifestFileFieldFromIndex(int32_t index) {
+  if (index >= 0 && index < 
static_cast<int32_t>(ManifestFileField::kNextUnusedId)) {
+    return static_cast<ManifestFileField>(index);
+  }
+  return InvalidArgument("Invalid manifest file field index: {}", index);
+}
+
 Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(
     const ManifestFile& manifest, std::shared_ptr<FileIO> file_io,
-    std::shared_ptr<StructType> partition_type) {
-  auto manifest_entry_schema =
-      ManifestEntry::TypeFromPartitionType(std::move(partition_type));
-  std::shared_ptr<Schema> schema =
-      FromStructType(std::move(*manifest_entry_schema), std::nullopt);
-
-  ICEBERG_ASSIGN_OR_RAISE(auto reader,
-                          ReaderFactoryRegistry::Open(FileFormatType::kAvro,
-                                                      {.path = 
manifest.manifest_path,
-                                                       .length = 
manifest.manifest_length,
-                                                       .io = 
std::move(file_io),
-                                                       .projection = schema}));
+    std::shared_ptr<Schema> schema, std::shared_ptr<PartitionSpec> spec) {
+  if (file_io == nullptr || schema == nullptr || spec == nullptr) {
+    return InvalidArgument(
+        "FileIO, Schema, and PartitionSpec cannot be null to create 
ManifestReader");
+  }
+
   // Create inheritable metadata for this manifest
   ICEBERG_ASSIGN_OR_RAISE(auto inheritable_metadata,
                           InheritableMetadataFactory::FromManifest(manifest));
-
-  return std::make_unique<ManifestReaderImpl>(std::move(reader), 
std::move(schema),
-                                              std::move(inheritable_metadata),
-                                              manifest.first_row_id);
+  return std::make_unique<ManifestReaderImpl>(
+      manifest.manifest_path, manifest.manifest_length, std::move(file_io),
+      std::move(schema), std::move(spec), std::move(inheritable_metadata),
+      manifest.first_row_id);
 }
 
 Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(
     std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
-    std::shared_ptr<StructType> partition_type) {
-  auto manifest_entry_schema =
-      ManifestEntry::TypeFromPartitionType(std::move(partition_type));
-  auto fields_span = manifest_entry_schema->fields();
-  std::vector<SchemaField> fields(fields_span.begin(), fields_span.end());
-  auto schema = std::make_shared<Schema>(fields);
-  ICEBERG_ASSIGN_OR_RAISE(
-      auto reader, ReaderFactoryRegistry::Open(FileFormatType::kAvro,
-                                               {.path = 
std::string(manifest_location),
-                                                .io = std::move(file_io),
-                                                .projection = schema}));
+    std::shared_ptr<Schema> schema, std::shared_ptr<PartitionSpec> spec) {
+  if (file_io == nullptr || schema == nullptr || spec == nullptr) {
+    return InvalidArgument(
+        "FileIO, Schema, and PartitionSpec cannot be null to create 
ManifestReader");
+  }
+
+  // No metadata to inherit in this case.
   ICEBERG_ASSIGN_OR_RAISE(auto inheritable_metadata, 
InheritableMetadataFactory::Empty());
-  return std::make_unique<ManifestReaderImpl>(std::move(reader), 
std::move(schema),
-                                              std::move(inheritable_metadata),
-                                              std::nullopt);
+  return std::make_unique<ManifestReaderImpl>(
+      std::string(manifest_location), std::nullopt, std::move(file_io), 
std::move(schema),
+      std::move(spec), std::move(inheritable_metadata), std::nullopt);
 }
 
 Result<std::unique_ptr<ManifestListReader>> ManifestListReader::Make(
     std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io) {
-  std::shared_ptr<Schema> schema = ManifestFile::Type();
+  std::shared_ptr<Schema> schema = ManifestFile::Type()->ToSchema();
   ICEBERG_ASSIGN_OR_RAISE(
       auto reader,
       ReaderFactoryRegistry::Open(FileFormatType::kAvro,
diff --git a/src/iceberg/manifest/manifest_reader.h 
b/src/iceberg/manifest/manifest_reader.h
index e7d6044d..a35c1fb9 100644
--- a/src/iceberg/manifest/manifest_reader.h
+++ b/src/iceberg/manifest/manifest_reader.h
@@ -23,11 +23,12 @@
 /// Data reader interface for manifest files.
 
 #include <memory>
+#include <string>
+#include <unordered_map>
 #include <vector>
 
 #include "iceberg/iceberg_export.h"
 #include "iceberg/result.h"
-#include "iceberg/type.h"
 #include "iceberg/type_fwd.h"
 
 namespace iceberg {
@@ -35,31 +36,67 @@ namespace iceberg {
 /// \brief Read manifest entries from a manifest file.
 class ICEBERG_EXPORT ManifestReader {
  public:
+  /// \brief Special value to select all columns from manifest files.
+  static constexpr std::string_view kAllColumns = "*";
+
   virtual ~ManifestReader() = default;
 
   /// \brief Read all manifest entries in the manifest file.
-  virtual Result<std::vector<ManifestEntry>> Entries() const = 0;
+  ///
+  /// TODO(gangwu): provide a lazy-evaluated iterator interface for better 
performance.
+  virtual Result<std::vector<ManifestEntry>> Entries() = 0;
 
-  /// \brief Get the metadata of the manifest file.
-  virtual Result<std::unordered_map<std::string, std::string>> Metadata() 
const = 0;
+  /// \brief Read only live (non-deleted) manifest entries.
+  virtual Result<std::vector<ManifestEntry>> LiveEntries() = 0;
+
+  /// \brief Select specific columns of data file to read from the manifest 
entries.
+  ///
+  /// \note Column names should match the names in `DataFile` schema. 
Unmatched names
+  /// will be ignored.
+  virtual ManifestReader& Select(const std::vector<std::string>& columns) = 0;
+
+  /// \brief Filter manifest entries by partition filter.
+  ///
+  /// \note Unlike the Java implementation, this method does not combine new 
expressions
+  /// with existing ones. Each call replaces the previous partition filter.
+  virtual ManifestReader& FilterPartitions(std::shared_ptr<Expression> expr) = 
0;
+
+  /// \brief Filter manifest entries to a specific set of partitions.
+  virtual ManifestReader& FilterPartitions(
+      std::shared_ptr<class PartitionSet> partition_set) = 0;
+
+  /// \brief Filter manifest entries by row-level filter.
+  ///
+  /// \note Unlike the Java implementation, this method does not combine new 
expressions
+  /// with existing ones. Each call replaces the previous row filter.
+  virtual ManifestReader& FilterRows(std::shared_ptr<Expression> expr) = 0;
+
+  /// \brief Set case sensitivity for column name matching.
+  virtual ManifestReader& CaseSensitive(bool case_sensitive) = 0;
 
   /// \brief Creates a reader for a manifest file.
   /// \param manifest A ManifestFile object containing metadata about the 
manifest.
   /// \param file_io File IO implementation to use.
-  /// \param partition_type Schema for the partition.
+  /// \param schema Schema used to bind the partition type.
+  /// \param spec Partition spec used for this manifest file.
   /// \return A Result containing the reader or an error.
   static Result<std::unique_ptr<ManifestReader>> Make(
       const ManifestFile& manifest, std::shared_ptr<FileIO> file_io,
-      std::shared_ptr<StructType> partition_type);
+      std::shared_ptr<Schema> schema, std::shared_ptr<PartitionSpec> spec);
 
   /// \brief Creates a reader for a manifest file.
   /// \param manifest_location Path to the manifest file.
   /// \param file_io File IO implementation to use.
-  /// \param partition_type Schema for the partition.
+  /// \param schema Schema used to bind the partition type.
+  /// \param spec Partition spec used for this manifest file.
   /// \return A Result containing the reader or an error.
   static Result<std::unique_ptr<ManifestReader>> Make(
       std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
-      std::shared_ptr<StructType> partition_type);
+      std::shared_ptr<Schema> schema, std::shared_ptr<PartitionSpec> spec);
+
+  /// \brief Add stats columns to the column list if needed.
+  static std::vector<std::string> WithStatsColumns(
+      const std::vector<std::string>& columns);
 };
 
 /// \brief Read manifest files from a manifest list file.
diff --git a/src/iceberg/manifest/manifest_reader_internal.cc 
b/src/iceberg/manifest/manifest_reader_internal.cc
deleted file mode 100644
index f7e8d428..00000000
--- a/src/iceberg/manifest/manifest_reader_internal.cc
+++ /dev/null
@@ -1,606 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "manifest_reader_internal.h"
-
-#include <nanoarrow/nanoarrow.h>
-
-#include "iceberg/arrow/nanoarrow_status_internal.h"
-#include "iceberg/arrow_c_data_guard_internal.h"
-#include "iceberg/file_format.h"
-#include "iceberg/manifest/manifest_entry.h"
-#include "iceberg/manifest/manifest_list.h"
-#include "iceberg/schema.h"
-#include "iceberg/type.h"
-#include "iceberg/util/checked_cast.h"
-#include "iceberg/util/macros.h"
-
-namespace iceberg {
-
-#define PARSE_PRIMITIVE_FIELD(item, array_view, type)                          
         \
-  for (int64_t row_idx = 0; row_idx < array_view->length; row_idx++) {         
         \
-    if (!ArrowArrayViewIsNull(array_view, row_idx)) {                          
         \
-      auto value = ArrowArrayViewGetIntUnsafe(array_view, row_idx);            
         \
-      item = static_cast<type>(value);                                         
         \
-    } else if (required) {                                                     
         \
-      return InvalidManifestList("Field {} is required but null at row {}", 
field_name, \
-                                 row_idx);                                     
         \
-    }                                                                          
         \
-  }
-
-#define PARSE_STRING_FIELD(item, array_view)                                   
         \
-  for (int64_t row_idx = 0; row_idx < array_view->length; row_idx++) {         
         \
-    if (!ArrowArrayViewIsNull(array_view, row_idx)) {                          
         \
-      auto value = ArrowArrayViewGetStringUnsafe(array_view, row_idx);         
         \
-      item = std::string(value.data, value.size_bytes);                        
         \
-    } else if (required) {                                                     
         \
-      return InvalidManifestList("Field {} is required but null at row {}", 
field_name, \
-                                 row_idx);                                     
         \
-    }                                                                          
         \
-  }
-
-#define PARSE_BINARY_FIELD(item, array_view)                                   
         \
-  for (int64_t row_idx = 0; row_idx < array_view->length; row_idx++) {         
         \
-    if (!ArrowArrayViewIsNull(view_of_column, row_idx)) {                      
         \
-      item = ArrowArrayViewGetInt8Vector(array_view, row_idx);                 
         \
-    } else if (required) {                                                     
         \
-      return InvalidManifestList("Field {} is required but null at row {}", 
field_name, \
-                                 row_idx);                                     
         \
-    }                                                                          
         \
-  }
-
-#define PARSE_INTEGER_VECTOR_FIELD(item, count, array_view, type)              
     \
-  for (int64_t manifest_idx = 0; manifest_idx < count; manifest_idx++) {       
     \
-    auto offset = ArrowArrayViewListChildOffset(array_view, manifest_idx);     
     \
-    auto next_offset = ArrowArrayViewListChildOffset(array_view, manifest_idx 
+ 1); \
-    for (int64_t offset_idx = offset; offset_idx < next_offset; offset_idx++) 
{     \
-      item.emplace_back(static_cast<type>(                                     
     \
-          ArrowArrayViewGetIntUnsafe(array_view->children[0], offset_idx)));   
     \
-    }                                                                          
     \
-  }
-
-#define PARSE_MAP_FIELD(item, count, array_view, key_type, value_type, 
assignment)   \
-  do {                                                                         
      \
-    if (array_view->storage_type != ArrowType::NANOARROW_TYPE_MAP) {           
      \
-      return InvalidManifest("Field:{} should be a map.", field_name);         
      \
-    }                                                                          
      \
-    auto view_of_map = array_view->children[0];                                
      \
-    ASSERT_VIEW_TYPE_AND_CHILDREN(view_of_map, 
ArrowType::NANOARROW_TYPE_STRUCT, 2); \
-    auto view_of_map_key = view_of_map->children[0];                           
      \
-    ASSERT_VIEW_TYPE(view_of_map_key, key_type);                               
      \
-    auto view_of_map_value = view_of_map->children[1];                         
      \
-    ASSERT_VIEW_TYPE(view_of_map_value, value_type);                           
      \
-    for (int64_t row_idx = 0; row_idx < count; row_idx++) {                    
      \
-      auto offset = array_view->buffer_views[1].data.as_int32[row_idx];        
      \
-      auto next_offset = array_view->buffer_views[1].data.as_int32[row_idx + 
1];     \
-      for (int32_t offset_idx = offset; offset_idx < next_offset; 
offset_idx++) {    \
-        auto key = ArrowArrayViewGetIntUnsafe(view_of_map_key, offset_idx);    
      \
-        item[key] = assignment;                                                
      \
-      }                                                                        
      \
-    }                                                                          
      \
-  } while (0)
-
-#define PARSE_INT_LONG_MAP_FIELD(item, count, array_view)                   \
-  PARSE_MAP_FIELD(item, count, array_view, ArrowType::NANOARROW_TYPE_INT32, \
-                  ArrowType::NANOARROW_TYPE_INT64,                          \
-                  ArrowArrayViewGetIntUnsafe(view_of_map_value, offset_idx));
-
-#define PARSE_INT_BINARY_MAP_FIELD(item, count, array_view)                 \
-  PARSE_MAP_FIELD(item, count, array_view, ArrowType::NANOARROW_TYPE_INT32, \
-                  ArrowType::NANOARROW_TYPE_BINARY,                         \
-                  ArrowArrayViewGetInt8Vector(view_of_map_value, offset_idx));
-
-#define ASSERT_VIEW_TYPE(view, type)                                           
\
-  if (view->storage_type != type) {                                            
\
-    return InvalidManifest("Sub Field:{} should be a {}.", field_name, #type); 
\
-  }
-
-#define ASSERT_VIEW_TYPE_AND_CHILDREN(view, type, n_child)                     
  \
-  if (view->storage_type != type) {                                            
  \
-    return InvalidManifest("Sub Field:{} should be a {}.", field_name, #type); 
  \
-  }                                                                            
  \
-  if (view->n_children != n_child) {                                           
  \
-    return InvalidManifest("Sub Field for:{} should have key&value:{} 
columns.", \
-                           field_name, n_child);                               
  \
-  }
-
-std::vector<uint8_t> ArrowArrayViewGetInt8Vector(const ArrowArrayView* view,
-                                                 int32_t offset_idx) {
-  auto buffer = ArrowArrayViewGetBytesUnsafe(view, offset_idx);
-  return {buffer.data.as_char, buffer.data.as_char + buffer.size_bytes};
-}
-
-Status ParsePartitionFieldSummaryList(ArrowArrayView* view_of_column,
-                                      std::vector<ManifestFile>& 
manifest_files) {
-  auto manifest_count = view_of_column->length;
-  // view_of_column is list<struct<PartitionFieldSummary>>
-  if (view_of_column->storage_type != ArrowType::NANOARROW_TYPE_LIST) {
-    return InvalidManifestList("partitions field should be a list.");
-  }
-  auto view_of_list_iterm = view_of_column->children[0];
-  // view_of_list_iterm is struct<PartitionFieldSummary>
-  if (view_of_list_iterm->storage_type != ArrowType::NANOARROW_TYPE_STRUCT) {
-    return InvalidManifestList("partitions list field should be a list.");
-  }
-  if (view_of_list_iterm->n_children != 4) {
-    return InvalidManifestList("PartitionFieldSummary should have 4 fields.");
-  }
-  if (view_of_list_iterm->children[0]->storage_type != 
ArrowType::NANOARROW_TYPE_BOOL) {
-    return InvalidManifestList("contains_null should have be bool type 
column.");
-  }
-  auto contains_null = view_of_list_iterm->children[0];
-  if (view_of_list_iterm->children[1]->storage_type != 
ArrowType::NANOARROW_TYPE_BOOL) {
-    return InvalidManifestList("contains_nan should have be bool type 
column.");
-  }
-  auto contains_nan = view_of_list_iterm->children[1];
-  if (view_of_list_iterm->children[2]->storage_type != 
ArrowType::NANOARROW_TYPE_BINARY) {
-    return InvalidManifestList("lower_bound should have be binary type 
column.");
-  }
-  auto lower_bound_list = view_of_list_iterm->children[2];
-  if (view_of_list_iterm->children[3]->storage_type != 
ArrowType::NANOARROW_TYPE_BINARY) {
-    return InvalidManifestList("upper_bound should have be binary type 
column.");
-  }
-  auto upper_bound_list = view_of_list_iterm->children[3];
-  for (int64_t manifest_idx = 0; manifest_idx < manifest_count; 
manifest_idx++) {
-    auto offset = ArrowArrayViewListChildOffset(view_of_column, manifest_idx);
-    auto next_offset = ArrowArrayViewListChildOffset(view_of_column, 
manifest_idx + 1);
-    // partitions from offset to next_offset belongs to manifest_idx
-    auto& manifest_file = manifest_files[manifest_idx];
-    for (int64_t partition_idx = offset; partition_idx < next_offset; 
partition_idx++) {
-      PartitionFieldSummary partition_field_summary;
-      if (!ArrowArrayViewIsNull(contains_null, partition_idx)) {
-        partition_field_summary.contains_null =
-            ArrowArrayViewGetIntUnsafe(contains_null, partition_idx);
-      } else {
-        return InvalidManifestList("contains_null is null at row {}", 
partition_idx);
-      }
-      if (!ArrowArrayViewIsNull(contains_nan, partition_idx)) {
-        partition_field_summary.contains_nan =
-            ArrowArrayViewGetIntUnsafe(contains_nan, partition_idx);
-      }
-      if (!ArrowArrayViewIsNull(lower_bound_list, partition_idx)) {
-        partition_field_summary.lower_bound =
-            ArrowArrayViewGetInt8Vector(lower_bound_list, partition_idx);
-      }
-      if (!ArrowArrayViewIsNull(upper_bound_list, partition_idx)) {
-        partition_field_summary.upper_bound =
-            ArrowArrayViewGetInt8Vector(upper_bound_list, partition_idx);
-      }
-
-      manifest_file.partitions.emplace_back(partition_field_summary);
-    }
-  }
-  return {};
-}
-
-Result<std::vector<ManifestFile>> ParseManifestList(ArrowSchema* schema,
-                                                    ArrowArray* array_in,
-                                                    const Schema& 
iceberg_schema) {
-  if (schema->n_children != array_in->n_children) {
-    return InvalidManifestList("Columns size not match between schema:{} and 
array:{}",
-                               schema->n_children, array_in->n_children);
-  }
-  if (iceberg_schema.fields().size() != array_in->n_children) {
-    return InvalidManifestList("Columns size not match between schema:{} and 
array:{}",
-                               iceberg_schema.fields().size(), 
array_in->n_children);
-  }
-
-  ArrowError error;
-  ArrowArrayView array_view;
-  auto status = ArrowArrayViewInitFromSchema(&array_view, schema, &error);
-  ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(status, error);
-  internal::ArrowArrayViewGuard view_guard(&array_view);
-  status = ArrowArrayViewSetArray(&array_view, array_in, &error);
-  ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(status, error);
-  status = ArrowArrayViewValidate(&array_view, 
NANOARROW_VALIDATION_LEVEL_FULL, &error);
-  ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(status, error);
-
-  std::vector<ManifestFile> manifest_files;
-  manifest_files.resize(array_in->length);
-
-  for (int64_t idx = 0; idx < array_in->n_children; idx++) {
-    const auto& field = iceberg_schema.GetFieldByIndex(idx);
-    if (!field.has_value()) {
-      return InvalidSchema("Field index {} is not found in schema", idx);
-    }
-    auto field_name = field.value()->get().name();
-    bool required = !field.value()->get().optional();
-    auto view_of_column = array_view.children[idx];
-    ICEBERG_ASSIGN_OR_RAISE(auto manifest_file_field, 
ManifestFileFieldFromIndex(idx));
-    switch (manifest_file_field) {
-      case ManifestFileField::kManifestPath:
-        PARSE_STRING_FIELD(manifest_files[row_idx].manifest_path, 
view_of_column);
-        break;
-      case ManifestFileField::kManifestLength:
-        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].manifest_length, 
view_of_column,
-                              int64_t);
-        break;
-      case ManifestFileField::kPartitionSpecId:
-        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].partition_spec_id, 
view_of_column,
-                              int32_t);
-        break;
-      case ManifestFileField::kContent:
-        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].content, view_of_column,
-                              ManifestContent);
-        break;
-      case ManifestFileField::kSequenceNumber:
-        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].sequence_number, 
view_of_column,
-                              int64_t);
-        break;
-      case ManifestFileField::kMinSequenceNumber:
-        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].min_sequence_number, 
view_of_column,
-                              int64_t);
-        break;
-      case ManifestFileField::kAddedSnapshotId:
-        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_snapshot_id, 
view_of_column,
-                              int64_t);
-        break;
-      case ManifestFileField::kAddedFilesCount:
-        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_files_count, 
view_of_column,
-                              int32_t);
-        break;
-      case ManifestFileField::kExistingFilesCount:
-        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].existing_files_count,
-                              view_of_column, int32_t);
-        break;
-      case ManifestFileField::kDeletedFilesCount:
-        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].deleted_files_count, 
view_of_column,
-                              int32_t);
-        break;
-      case ManifestFileField::kAddedRowsCount:
-        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_rows_count, 
view_of_column,
-                              int64_t);
-        break;
-      case ManifestFileField::kExistingRowsCount:
-        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].existing_rows_count, 
view_of_column,
-                              int64_t);
-        break;
-      case ManifestFileField::kDeletedRowsCount:
-        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].deleted_rows_count, 
view_of_column,
-                              int64_t);
-        break;
-      case ManifestFileField::kPartitionFieldSummary:
-        ICEBERG_RETURN_UNEXPECTED(
-            ParsePartitionFieldSummaryList(view_of_column, manifest_files));
-        break;
-      case ManifestFileField::kKeyMetadata:
-        PARSE_BINARY_FIELD(manifest_files[row_idx].key_metadata, 
view_of_column);
-        break;
-      case ManifestFileField::kFirstRowId:
-        PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].first_row_id, 
view_of_column,
-                              int64_t);
-        break;
-      default:
-        return InvalidManifestList("Unsupported field: {} in manifest file.", 
field_name);
-    }
-  }
-  return manifest_files;
-}
-
-Status ParseLiteral(ArrowArrayView* view_of_partition, int64_t row_idx,
-                    std::vector<ManifestEntry>& manifest_entries) {
-  if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_BOOL) {
-    auto value = ArrowArrayViewGetUIntUnsafe(view_of_partition, row_idx);
-    
manifest_entries[row_idx].data_file->partition.AddValue(Literal::Boolean(value 
!= 0));
-  } else if (view_of_partition->storage_type == 
ArrowType::NANOARROW_TYPE_INT32) {
-    auto value = ArrowArrayViewGetIntUnsafe(view_of_partition, row_idx);
-    
manifest_entries[row_idx].data_file->partition.AddValue(Literal::Int(value));
-  } else if (view_of_partition->storage_type == 
ArrowType::NANOARROW_TYPE_INT64) {
-    auto value = ArrowArrayViewGetIntUnsafe(view_of_partition, row_idx);
-    
manifest_entries[row_idx].data_file->partition.AddValue(Literal::Long(value));
-  } else if (view_of_partition->storage_type == 
ArrowType::NANOARROW_TYPE_FLOAT) {
-    auto value = ArrowArrayViewGetDoubleUnsafe(view_of_partition, row_idx);
-    
manifest_entries[row_idx].data_file->partition.AddValue(Literal::Float(value));
-  } else if (view_of_partition->storage_type == 
ArrowType::NANOARROW_TYPE_DOUBLE) {
-    auto value = ArrowArrayViewGetDoubleUnsafe(view_of_partition, row_idx);
-    
manifest_entries[row_idx].data_file->partition.AddValue(Literal::Double(value));
-  } else if (view_of_partition->storage_type == 
ArrowType::NANOARROW_TYPE_STRING) {
-    auto value = ArrowArrayViewGetStringUnsafe(view_of_partition, row_idx);
-    manifest_entries[row_idx].data_file->partition.AddValue(
-        Literal::String(std::string(value.data, value.size_bytes)));
-  } else if (view_of_partition->storage_type == 
ArrowType::NANOARROW_TYPE_BINARY) {
-    auto buffer = ArrowArrayViewGetBytesUnsafe(view_of_partition, row_idx);
-    manifest_entries[row_idx].data_file->partition.AddValue(
-        Literal::Binary(std::vector<uint8_t>(buffer.data.as_char,
-                                             buffer.data.as_char + 
buffer.size_bytes)));
-  } else {
-    return InvalidManifest("Unsupported field type: {} in data file 
partition.",
-                           
static_cast<int32_t>(view_of_partition->storage_type));
-  }
-  return {};
-}
-
-Status ParseDataFile(const std::shared_ptr<StructType>& data_file_schema,
-                     ArrowArrayView* view_of_column, std::optional<int64_t>& 
first_row_id,
-                     std::vector<ManifestEntry>& manifest_entries) {
-  if (view_of_column->storage_type != ArrowType::NANOARROW_TYPE_STRUCT) {
-    return InvalidManifest("DataFile field should be a struct.");
-  }
-  if (view_of_column->n_children != data_file_schema->fields().size()) {
-    return InvalidManifest("DataFile schema size:{} not match with ArrayArray 
columns:{}",
-                           data_file_schema->fields().size(), 
view_of_column->n_children);
-  }
-  for (int64_t col_idx = 0; col_idx < view_of_column->n_children; ++col_idx) {
-    auto field_name = 
data_file_schema->GetFieldByIndex(col_idx).value()->get().name();
-    auto required = 
!data_file_schema->GetFieldByIndex(col_idx).value()->get().optional();
-    auto view_of_file_field = view_of_column->children[col_idx];
-    auto manifest_entry_count = view_of_file_field->length;
-
-    switch (col_idx) {
-      case 0:
-        PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->content,
-                              view_of_file_field, DataFile::Content);
-        break;
-      case 1:
-        PARSE_STRING_FIELD(manifest_entries[row_idx].data_file->file_path,
-                           view_of_file_field);
-        break;
-      case 2:
-        for (int64_t row_idx = 0; row_idx < view_of_file_field->length; 
row_idx++) {
-          if (!ArrowArrayViewIsNull(view_of_file_field, row_idx)) {
-            auto value = ArrowArrayViewGetStringUnsafe(view_of_file_field, 
row_idx);
-            std::string_view path_str(value.data, value.size_bytes);
-            
ICEBERG_ASSIGN_OR_RAISE(manifest_entries[row_idx].data_file->file_format,
-                                    FileFormatTypeFromString(path_str));
-          }
-        }
-        break;
-      case 3: {
-        if (view_of_file_field->storage_type != 
ArrowType::NANOARROW_TYPE_STRUCT) {
-          return InvalidManifest("Field:{} should be a struct.", field_name);
-        }
-        if (view_of_file_field->n_children > 0) {
-          for (int64_t partition_idx = 0; partition_idx < 
view_of_file_field->n_children;
-               partition_idx++) {
-            auto view_of_partition = 
view_of_file_field->children[partition_idx];
-            for (int64_t row_idx = 0; row_idx < view_of_partition->length; 
row_idx++) {
-              if (ArrowArrayViewIsNull(view_of_partition, row_idx)) {
-                break;
-              }
-              ICEBERG_RETURN_UNEXPECTED(
-                  ParseLiteral(view_of_partition, row_idx, manifest_entries));
-            }
-          }
-        }
-      } break;
-      case 4:
-        
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->record_count,
-                              view_of_file_field, int64_t);
-        break;
-      case 5:
-        
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->file_size_in_bytes,
-                              view_of_file_field, int64_t);
-        break;
-      case 6:
-        // key&value should have the same offset
-        // HACK(xiao.dong) workaround for arrow bug:
-        // ArrowArrayViewListChildOffset can not get the correct offset for map
-        
PARSE_INT_LONG_MAP_FIELD(manifest_entries[row_idx].data_file->column_sizes,
-                                 manifest_entry_count, view_of_file_field);
-        break;
-      case 7:
-        
PARSE_INT_LONG_MAP_FIELD(manifest_entries[row_idx].data_file->value_counts,
-                                 manifest_entry_count, view_of_file_field);
-        break;
-      case 8:
-        
PARSE_INT_LONG_MAP_FIELD(manifest_entries[row_idx].data_file->null_value_counts,
-                                 manifest_entry_count, view_of_file_field);
-        break;
-      case 9:
-        
PARSE_INT_LONG_MAP_FIELD(manifest_entries[row_idx].data_file->nan_value_counts,
-                                 manifest_entry_count, view_of_file_field);
-        break;
-      case 10:
-        
PARSE_INT_BINARY_MAP_FIELD(manifest_entries[row_idx].data_file->lower_bounds,
-                                   manifest_entry_count, view_of_file_field);
-        break;
-      case 11:
-        
PARSE_INT_BINARY_MAP_FIELD(manifest_entries[row_idx].data_file->upper_bounds,
-                                   manifest_entry_count, view_of_file_field);
-        break;
-      case 12:
-        PARSE_BINARY_FIELD(manifest_entries[row_idx].data_file->key_metadata,
-                           view_of_file_field);
-        break;
-      case 13:
-        PARSE_INTEGER_VECTOR_FIELD(
-            manifest_entries[manifest_idx].data_file->split_offsets, 
manifest_entry_count,
-            view_of_file_field, int64_t);
-        break;
-      case 14:
-        
PARSE_INTEGER_VECTOR_FIELD(manifest_entries[manifest_idx].data_file->equality_ids,
-                                   manifest_entry_count, view_of_file_field, 
int32_t);
-        break;
-      case 15:
-        
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->sort_order_id,
-                              view_of_file_field, int32_t);
-        break;
-      case 16: {
-        
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->first_row_id,
-                              view_of_file_field, int64_t);
-        if (first_row_id.has_value()) {
-          std::ranges::for_each(manifest_entries, 
[&first_row_id](ManifestEntry& entry) {
-            if (entry.status != ManifestStatus::kDeleted &&
-                !entry.data_file->first_row_id.has_value()) {
-              entry.data_file->first_row_id = first_row_id.value();
-              first_row_id = first_row_id.value() + 
entry.data_file->record_count;
-            }
-          });
-        } else {
-          // data file's first_row_id is null when the manifest's first_row_id 
is null
-          std::ranges::for_each(manifest_entries, [](ManifestEntry& entry) {
-            entry.data_file->first_row_id = std::nullopt;
-          });
-        }
-        break;
-      }
-      case 17:
-        
PARSE_STRING_FIELD(manifest_entries[row_idx].data_file->referenced_data_file,
-                           view_of_file_field);
-        break;
-      case 18:
-        
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->content_offset,
-                              view_of_file_field, int64_t);
-        break;
-      case 19:
-        
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->content_size_in_bytes,
-                              view_of_file_field, int64_t);
-        break;
-      default:
-        return InvalidManifest("Unsupported field: {} in data file.", 
field_name);
-    }
-  }
-  return {};
-}
-
-Result<std::vector<ManifestEntry>> ParseManifestEntry(
-    ArrowSchema* schema, ArrowArray* array_in, const Schema& iceberg_schema,
-    std::optional<int64_t>& first_row_id) {
-  if (schema->n_children != array_in->n_children) {
-    return InvalidManifest("Columns size not match between schema:{} and 
array:{}",
-                           schema->n_children, array_in->n_children);
-  }
-  if (iceberg_schema.fields().size() != array_in->n_children) {
-    return InvalidManifest("Columns size not match between schema:{} and 
array:{}",
-                           iceberg_schema.fields().size(), 
array_in->n_children);
-  }
-
-  ArrowError error;
-  ArrowArrayView array_view;
-  auto status = ArrowArrayViewInitFromSchema(&array_view, schema, &error);
-  ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(status, error);
-  internal::ArrowArrayViewGuard view_guard(&array_view);
-  status = ArrowArrayViewSetArray(&array_view, array_in, &error);
-  ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(status, error);
-  status = ArrowArrayViewValidate(&array_view, 
NANOARROW_VALIDATION_LEVEL_FULL, &error);
-  ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(status, error);
-
-  std::vector<ManifestEntry> manifest_entries;
-  manifest_entries.resize(array_in->length);
-  for (int64_t i = 0; i < array_in->length; i++) {
-    manifest_entries[i].data_file = std::make_shared<DataFile>();
-  }
-
-  for (int64_t idx = 0; idx < array_in->n_children; idx++) {
-    const auto& field = iceberg_schema.GetFieldByIndex(idx);
-    if (!field.has_value()) {
-      return InvalidManifest("Field not found in schema: {}", idx);
-    }
-    auto field_name = field.value()->get().name();
-    bool required = !field.value()->get().optional();
-    auto view_of_column = array_view.children[idx];
-
-    switch (idx) {
-      case 0:
-        PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].status, view_of_column,
-                              ManifestStatus);
-        break;
-      case 1:
-        PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].snapshot_id, 
view_of_column,
-                              int64_t);
-        break;
-      case 2:
-        PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].sequence_number, 
view_of_column,
-                              int64_t);
-        break;
-      case 3:
-        PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].file_sequence_number,
-                              view_of_column, int64_t);
-        break;
-      case 4: {
-        auto data_file_schema =
-            
internal::checked_pointer_cast<StructType>(field.value()->get().type());
-        ICEBERG_RETURN_UNEXPECTED(ParseDataFile(data_file_schema, 
view_of_column,
-                                                first_row_id, 
manifest_entries));
-        break;
-      }
-      default:
-        return InvalidManifest("Unsupported field: {} in manifest entry.", 
field_name);
-    }
-  }
-  return manifest_entries;
-}
-
-Result<std::vector<ManifestEntry>> ManifestReaderImpl::Entries() const {
-  std::vector<ManifestEntry> manifest_entries;
-  ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, reader_->Schema());
-  internal::ArrowSchemaGuard schema_guard(&arrow_schema);
-  while (true) {
-    ICEBERG_ASSIGN_OR_RAISE(auto result, reader_->Next());
-    if (result.has_value()) {
-      internal::ArrowArrayGuard array_guard(&result.value());
-      ICEBERG_ASSIGN_OR_RAISE(
-          auto parse_result,
-          ParseManifestEntry(&arrow_schema, &result.value(), *schema_, 
first_row_id_));
-      manifest_entries.insert(manifest_entries.end(),
-                              std::make_move_iterator(parse_result.begin()),
-                              std::make_move_iterator(parse_result.end()));
-    } else {
-      // eof
-      break;
-    }
-  }
-
-  // Apply inheritance to all entries
-  for (auto& entry : manifest_entries) {
-    ICEBERG_RETURN_UNEXPECTED(inheritable_metadata_->Apply(entry));
-  }
-
-  return manifest_entries;
-}
-
-Result<std::unordered_map<std::string, std::string>> 
ManifestReaderImpl::Metadata()
-    const {
-  return reader_->Metadata();
-}
-
-Result<std::vector<ManifestFile>> ManifestListReaderImpl::Files() const {
-  std::vector<ManifestFile> manifest_files;
-  ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, reader_->Schema());
-  internal::ArrowSchemaGuard schema_guard(&arrow_schema);
-  while (true) {
-    ICEBERG_ASSIGN_OR_RAISE(auto result, reader_->Next());
-    if (result.has_value()) {
-      internal::ArrowArrayGuard array_guard(&result.value());
-      ICEBERG_ASSIGN_OR_RAISE(
-          auto parse_result, ParseManifestList(&arrow_schema, &result.value(), 
*schema_));
-      manifest_files.insert(manifest_files.end(),
-                            std::make_move_iterator(parse_result.begin()),
-                            std::make_move_iterator(parse_result.end()));
-    } else {
-      // eof
-      break;
-    }
-  }
-  return manifest_files;
-}
-
-Result<std::unordered_map<std::string, std::string>> 
ManifestListReaderImpl::Metadata()
-    const {
-  return reader_->Metadata();
-}
-
-Result<ManifestFileField> ManifestFileFieldFromIndex(int32_t index) {
-  if (index >= 0 && index < 
static_cast<int32_t>(ManifestFileField::kNextUnusedId)) {
-    return static_cast<ManifestFileField>(index);
-  }
-  return InvalidArgument("Invalid manifest file field index: {}", index);
-}
-
-}  // namespace iceberg
diff --git a/src/iceberg/manifest/manifest_reader_internal.h 
b/src/iceberg/manifest/manifest_reader_internal.h
index 513f4050..ba804ffb 100644
--- a/src/iceberg/manifest/manifest_reader_internal.h
+++ b/src/iceberg/manifest/manifest_reader_internal.h
@@ -22,33 +22,101 @@
 /// \file iceberg/manifest/manifest_reader_internal.h
 /// Reader implementation for manifest list files and manifest files.
 
+#include <memory>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include "iceberg/expression/evaluator.h"
+#include "iceberg/expression/expression.h"
+#include "iceberg/expression/inclusive_metrics_evaluator.h"
 #include "iceberg/file_reader.h"
 #include "iceberg/inheritable_metadata.h"
 #include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/util/partition_value_util.h"
 
 namespace iceberg {
 
 /// \brief Read manifest entries from a manifest file.
+///
+/// This implementation supports lazy reader creation and filtering based on
+/// partition expressions, row expressions, and partition sets. Following the
+/// Java implementation pattern.
 class ManifestReaderImpl : public ManifestReader {
  public:
-  explicit ManifestReaderImpl(std::unique_ptr<Reader> reader,
-                              std::shared_ptr<Schema> schema,
-                              std::unique_ptr<InheritableMetadata> 
inheritable_metadata,
-                              std::optional<int64_t> first_row_id)
-      : schema_(std::move(schema)),
-        reader_(std::move(reader)),
-        inheritable_metadata_(std::move(inheritable_metadata)),
-        first_row_id_(first_row_id) {}
+  /// \brief Construct a ManifestReaderImpl for lazy initialization.
+  ///
+  /// \param manifest_path Path to the manifest file.
+  /// \param manifest_length Length of the manifest file (optional).
+  /// \param file_io File IO implementation.
+  /// \param schema Table schema.
+  /// \param spec Partition spec.
+  /// \param inheritable_metadata Metadata inherited from manifest.
+  /// \param first_row_id First row ID for V3 manifests.
+  /// \note ManifestReader::Make() functions should guarantee non-null 
parameters.
+  ManifestReaderImpl(std::string manifest_path, std::optional<int64_t> 
manifest_length,
+                     std::shared_ptr<FileIO> file_io, std::shared_ptr<Schema> 
schema,
+                     std::shared_ptr<PartitionSpec> spec,
+                     std::unique_ptr<InheritableMetadata> inheritable_metadata,
+                     std::optional<int64_t> first_row_id);
 
-  Result<std::vector<ManifestEntry>> Entries() const override;
+  Result<std::vector<ManifestEntry>> Entries() override;
 
-  Result<std::unordered_map<std::string, std::string>> Metadata() const 
override;
+  Result<std::vector<ManifestEntry>> LiveEntries() override;
+
+  ManifestReader& Select(const std::vector<std::string>& columns) override;
+
+  ManifestReader& FilterPartitions(std::shared_ptr<Expression> expr) override;
+
+  ManifestReader& FilterPartitions(std::shared_ptr<PartitionSet> 
partition_set) override;
+
+  ManifestReader& FilterRows(std::shared_ptr<Expression> expr) override;
+
+  ManifestReader& CaseSensitive(bool case_sensitive) override;
 
  private:
-  std::shared_ptr<Schema> schema_;
-  std::unique_ptr<Reader> reader_;
-  std::unique_ptr<InheritableMetadata> inheritable_metadata_;
-  mutable std::optional<int64_t> first_row_id_;
+  /// \brief Read entries with optional live-only filtering.
+  Result<std::vector<ManifestEntry>> ReadEntries(bool only_live);
+
+  /// \brief Lazily open the underlying Avro reader with appropriate schema 
projection.
+  Status OpenReader(std::shared_ptr<Schema> projection);
+
+  /// \brief Check if there's a non-trivial partition filter.
+  bool HasPartitionFilter() const;
+
+  /// \brief Check if there's a non-trivial row filter.
+  bool HasRowFilter() const;
+
+  /// \brief Get or create the partition evaluator.
+  Result<Evaluator*> GetEvaluator();
+
+  /// \brief Get or create the metrics evaluator.
+  Result<InclusiveMetricsEvaluator*> GetMetricsEvaluator();
+
+  /// \brief Check if a partition is in the partition set.
+  bool InPartitionSet(const DataFile& file) const;
+
+  // Fields set at construction
+  const std::string manifest_path_;
+  const std::optional<int64_t> manifest_length_;
+  const std::shared_ptr<FileIO> file_io_;
+  const std::shared_ptr<Schema> schema_;
+  const std::shared_ptr<PartitionSpec> spec_;
+  const std::unique_ptr<InheritableMetadata> inheritable_metadata_;
+  std::optional<int64_t> first_row_id_;
+
+  // Configuration fields
+  std::vector<std::string> columns_;
+  std::shared_ptr<Expression> part_filter_{True::Instance()};
+  std::shared_ptr<Expression> row_filter_{True::Instance()};
+  std::shared_ptr<PartitionSet> partition_set_;
+  bool case_sensitive_{true};
+
+  // Lazy fields
+  std::unique_ptr<Reader> file_reader_;
+  std::shared_ptr<Schema> file_schema_;
+  std::unique_ptr<Evaluator> evaluator_;
+  std::unique_ptr<InclusiveMetricsEvaluator> metrics_evaluator_;
 };
 
 /// \brief Read manifest files from a manifest list file.
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index 1596a2ea..7c1011fc 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -64,7 +64,6 @@ iceberg_sources = files(
     'manifest/manifest_entry.cc',
     'manifest/manifest_list.cc',
     'manifest/manifest_reader.cc',
-    'manifest/manifest_reader_internal.cc',
     'manifest/manifest_writer.cc',
     'manifest/v1_metadata.cc',
     'manifest/v2_metadata.cc',
diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc
index 1c4a73ff..6c182845 100644
--- a/src/iceberg/table_scan.cc
+++ b/src/iceberg/table_scan.cc
@@ -271,15 +271,13 @@ Result<std::vector<std::shared_ptr<FileScanTask>>> 
DataTableScan::PlanFiles() co
   std::vector<std::shared_ptr<FileScanTask>> tasks;
   ICEBERG_ASSIGN_OR_RAISE(auto partition_spec, 
context_.table_metadata->PartitionSpec());
 
-  // Get the table schema and partition type
+  // Get the table schema
   ICEBERG_ASSIGN_OR_RAISE(auto current_schema, 
context_.table_metadata->Schema());
-  ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<StructType> partition_type,
-                          partition_spec->PartitionType(*current_schema));
 
   for (const auto& manifest_file : manifest_files) {
     ICEBERG_ASSIGN_OR_RAISE(
         auto manifest_reader,
-        ManifestReader::Make(manifest_file, file_io_, partition_type));
+        ManifestReader::Make(manifest_file, file_io_, current_schema, 
partition_spec));
     ICEBERG_ASSIGN_OR_RAISE(auto manifests, manifest_reader->Entries());
 
     // TODO(gty404): filter manifests using partition spec and filter 
expression
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index 0d36f64c..7e71310d 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -122,6 +122,8 @@ if(ICEBERG_BUILD_BUNDLE)
                    avro_schema_test.cc
                    avro_stream_test.cc
                    manifest_list_versions_test.cc
+                   manifest_reader_stats_test.cc
+                   manifest_reader_test.cc
                    manifest_writer_versions_test.cc)
 
   add_iceberg_test(arrow_test
diff --git a/src/iceberg/test/manifest_reader_stats_test.cc 
b/src/iceberg/test/manifest_reader_stats_test.cc
new file mode 100644
index 00000000..327da225
--- /dev/null
+++ b/src/iceberg/test/manifest_reader_stats_test.cc
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <chrono>
+#include <format>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/avro/avro_register.h"
+#include "iceberg/expression/expressions.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/schema.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/transform.h"
+#include "iceberg/type.h"
+
+namespace iceberg {
+
+class TestManifestReaderStats : public testing::TestWithParam<int> {
+ protected:
+  void SetUp() override {
+    avro::RegisterAll();
+    file_io_ = arrow::MakeMockFileIO();
+
+    schema_ = std::make_shared<Schema>(std::vector<SchemaField>{
+        SchemaField::MakeRequired(/*field_id=*/3, "id", int32()),
+        SchemaField::MakeRequired(/*field_id=*/4, "data", string())});
+
+    ICEBERG_UNWRAP_OR_FAIL(
+        spec_,
+        PartitionSpec::Make(
+            /*spec_id=*/0, {PartitionField(/*source_id=*/4, /*field_id=*/1000,
+                                           "data_bucket", 
Transform::Bucket(16))}));
+  }
+
+  std::string MakeManifestPath() {
+    return std::format("manifest-{}.avro",
+                       
std::chrono::system_clock::now().time_since_epoch().count());
+  }
+
+  inline static const std::map<int32_t, int64_t> kValueCounts = {{3, 3L}};
+  inline static const std::map<int32_t, int64_t> kNullValueCounts = {{3, 0L}};
+  inline static const std::map<int32_t, int64_t> kNanValueCounts = {{3, 1L}};
+  inline static const std::map<int32_t, std::vector<uint8_t>> kLowerBounds = {
+      {3, Literal::Int(2).Serialize().value()}};
+  inline static const std::map<int32_t, std::vector<uint8_t>> kUpperBounds = {
+      {3, Literal::Int(4).Serialize().value()}};
+
+  std::unique_ptr<DataFile> MakeDataFileWithStats() {
+    return std::make_unique<DataFile>(DataFile{
+        .file_path = "/path/to/data-with-stats.parquet",
+        .file_format = FileFormatType::kParquet,
+        .partition = PartitionValues({Literal::Int(0)}),
+        .record_count = 3,
+        .file_size_in_bytes = 10,
+        .value_counts = kValueCounts,
+        .null_value_counts = kNullValueCounts,
+        .nan_value_counts = kNanValueCounts,
+        .lower_bounds = kLowerBounds,
+        .upper_bounds = kUpperBounds,
+        .sort_order_id = 0,
+    });
+  }
+
+  ManifestFile WriteManifest(int format_version, std::unique_ptr<DataFile> 
data_file) {
+    const std::string manifest_path = MakeManifestPath();
+
+    Result<std::unique_ptr<ManifestWriter>> writer_result =
+        NotSupported("Format version: {}", format_version);
+
+    switch (format_version) {
+      case 1:
+        writer_result = ManifestWriter::MakeV1Writer(/*snapshot_id=*/1000L, 
manifest_path,
+                                                     file_io_, spec_, schema_);
+        break;
+      case 2:
+        writer_result =
+            ManifestWriter::MakeV2Writer(/*snapshot_id=*/1000L, manifest_path, 
file_io_,
+                                         spec_, schema_, 
ManifestContent::kData);
+        break;
+      case 3:
+        writer_result = ManifestWriter::MakeV3Writer(
+            /*snapshot_id=*/1000L, /*sequence_number=*/0L, manifest_path, 
file_io_, spec_,
+            schema_, ManifestContent::kData);
+        break;
+    }
+
+    EXPECT_THAT(writer_result, IsOk());
+    auto writer = std::move(writer_result.value());
+
+    EXPECT_THAT(writer->WriteAddedEntry(std::move(data_file)), IsOk());
+    EXPECT_THAT(writer->Close(), IsOk());
+
+    auto manifest_result = writer->ToManifestFile();
+    EXPECT_THAT(manifest_result, IsOk());
+    return std::move(manifest_result.value());
+  }
+
+  void AssertFullStats(const DataFile& file) {
+    EXPECT_EQ(file.record_count, 3);
+    EXPECT_EQ(file.value_counts, kValueCounts);
+    EXPECT_EQ(file.null_value_counts, kNullValueCounts);
+    EXPECT_EQ(file.nan_value_counts, kNanValueCounts);
+    EXPECT_EQ(file.lower_bounds, kLowerBounds);
+    EXPECT_EQ(file.upper_bounds, kUpperBounds);
+  }
+
+  void AssertStatsDropped(const DataFile& file) {
+    EXPECT_EQ(file.record_count, 3);  // record count is not dropped
+    EXPECT_TRUE(file.value_counts.empty());
+    EXPECT_TRUE(file.null_value_counts.empty());
+    EXPECT_TRUE(file.nan_value_counts.empty());
+    EXPECT_TRUE(file.lower_bounds.empty());
+    EXPECT_TRUE(file.upper_bounds.empty());
+  }
+
+  std::shared_ptr<FileIO> file_io_;
+  std::shared_ptr<Schema> schema_;
+  std::shared_ptr<PartitionSpec> spec_;
+};
+
+TEST_P(TestManifestReaderStats, TestReadIncludesFullStats) {
+  int version = GetParam();
+  auto manifest = WriteManifest(version, MakeDataFileWithStats());
+
+  auto reader_result = ManifestReader::Make(manifest, file_io_, schema_, 
spec_);
+  ASSERT_THAT(reader_result, IsOk());
+  auto reader = std::move(reader_result.value());
+
+  auto entries_result = reader->Entries();
+  ASSERT_THAT(entries_result, IsOk());
+  auto entries = entries_result.value();
+  ASSERT_EQ(entries.size(), 1);
+  AssertFullStats(*entries[0].data_file);
+}
+
+TEST_P(TestManifestReaderStats, TestReadEntriesWithFilterIncludesFullStats) {
+  int version = GetParam();
+  auto manifest = WriteManifest(version, MakeDataFileWithStats());
+
+  auto reader_result = ManifestReader::Make(manifest, file_io_, schema_, 
spec_);
+  ASSERT_THAT(reader_result, IsOk());
+  auto reader = std::move(reader_result.value());
+
+  reader->FilterRows(Expressions::Equal("id", Literal::Int(3)));
+
+  auto entries_result = reader->Entries();
+  ASSERT_THAT(entries_result, IsOk());
+  auto entries = entries_result.value();
+  ASSERT_EQ(entries.size(), 1);
+  AssertFullStats(*entries[0].data_file);
+}
+
+TEST_P(TestManifestReaderStats, 
TestReadEntriesWithFilterAndSelectIncludesFullStats) {
+  int version = GetParam();
+  auto manifest = WriteManifest(version, MakeDataFileWithStats());
+
+  auto reader_result = ManifestReader::Make(manifest, file_io_, schema_, 
spec_);
+  ASSERT_THAT(reader_result, IsOk());
+  auto reader = std::move(reader_result.value());
+
+  reader->Select({"file_path"});
+  reader->FilterRows(Expressions::Equal("id", Literal::Int(3)));
+
+  auto entries_result = reader->Entries();
+  ASSERT_THAT(entries_result, IsOk());
+  auto entries = entries_result.value();
+  ASSERT_EQ(entries.size(), 1);
+  AssertFullStats(*entries[0].data_file);
+}
+
+TEST_P(TestManifestReaderStats, TestReadEntriesWithSelectNotProjectStats) {
+  int version = GetParam();
+  auto manifest = WriteManifest(version, MakeDataFileWithStats());
+
+  auto reader_result = ManifestReader::Make(manifest, file_io_, schema_, 
spec_);
+  ASSERT_THAT(reader_result, IsOk());
+  auto reader = std::move(reader_result.value());
+
+  reader->Select({"file_path"});
+
+  auto entries_result = reader->Entries();
+  ASSERT_THAT(entries_result, IsOk());
+  auto entries = entries_result.value();
+  ASSERT_EQ(entries.size(), 1);
+  AssertStatsDropped(*entries[0].data_file);
+}
+
+TEST_P(TestManifestReaderStats, 
TestReadEntriesWithSelectCertainStatNotProjectStats) {
+  int version = GetParam();
+  auto manifest = WriteManifest(version, MakeDataFileWithStats());
+
+  auto reader_result = ManifestReader::Make(manifest, file_io_, schema_, 
spec_);
+  ASSERT_THAT(reader_result, IsOk());
+  auto reader = std::move(reader_result.value());
+
+  reader->Select({"file_path", "value_counts"});
+
+  auto entries_result = reader->Entries();
+  ASSERT_THAT(entries_result, IsOk());
+  auto entries = entries_result.value();
+  ASSERT_EQ(entries.size(), 1);
+
+  const auto& file = *entries[0].data_file;
+  EXPECT_FALSE(file.value_counts.empty());
+  EXPECT_TRUE(file.null_value_counts.empty());
+  EXPECT_TRUE(file.nan_value_counts.empty());
+  EXPECT_TRUE(file.lower_bounds.empty());
+  EXPECT_TRUE(file.upper_bounds.empty());
+}
+
+INSTANTIATE_TEST_SUITE_P(ManifestReaderStatsVersions, TestManifestReaderStats,
+                         testing::Values(1, 2, 3));
+
+}  // namespace iceberg
diff --git a/src/iceberg/test/manifest_reader_test.cc 
b/src/iceberg/test/manifest_reader_test.cc
new file mode 100644
index 00000000..ab4798db
--- /dev/null
+++ b/src/iceberg/test/manifest_reader_test.cc
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest/manifest_reader.h"
+
+#include <chrono>
+#include <format>
+#include <memory>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/avro/avro_register.h"
+#include "iceberg/expression/expressions.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/schema.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/transform.h"
+#include "iceberg/type.h"
+
+namespace iceberg {
+
+class TestManifestReader : public testing::TestWithParam<int> {
+ protected:
+  void SetUp() override {
+    avro::RegisterAll();
+
+    file_io_ = arrow::MakeMockFileIO();
+
+    schema_ = std::make_shared<Schema>(std::vector<SchemaField>{
+        SchemaField::MakeRequired(/*field_id=*/3, "id", int32()),
+        SchemaField::MakeRequired(/*field_id=*/4, "data", string())});
+
+    ICEBERG_UNWRAP_OR_FAIL(
+        spec_,
+        PartitionSpec::Make(
+            /*spec_id=*/0, {PartitionField(/*source_id=*/4, /*field_id=*/1000,
+                                           "data_bucket", 
Transform::Bucket(16))}));
+  }
+
+  std::string MakeManifestPath() {
+    return std::format("manifest-{}.avro",
+                       
std::chrono::system_clock::now().time_since_epoch().count());
+  }
+
+  std::unique_ptr<DataFile> MakeDataFile(const std::string& path,
+                                         const PartitionValues& partition,
+                                         int64_t record_count = 1) {
+    return std::make_unique<DataFile>(DataFile{
+        .file_path = path,
+        .file_format = FileFormatType::kParquet,
+        .partition = partition,
+        .record_count = record_count,
+        .file_size_in_bytes = 10,
+        .sort_order_id = 0,
+    });
+  }
+
+  ManifestFile WriteManifest(int format_version, std::optional<int64_t> 
snapshot_id,
+                             const std::vector<ManifestEntry>& entries) {
+    const std::string manifest_path = MakeManifestPath();
+
+    Result<std::unique_ptr<ManifestWriter>> writer_result =
+        NotSupported("Format version: {}", format_version);
+
+    switch (format_version) {
+      case 1:
+        writer_result = ManifestWriter::MakeV1Writer(snapshot_id, 
manifest_path, file_io_,
+                                                     spec_, schema_);
+        break;
+      case 2:
+        writer_result = ManifestWriter::MakeV2Writer(
+            snapshot_id, manifest_path, file_io_, spec_, schema_, 
ManifestContent::kData);
+        break;
+      case 3:
+        writer_result = ManifestWriter::MakeV3Writer(snapshot_id, 
/*first_row_id=*/0L,
+                                                     manifest_path, file_io_, 
spec_,
+                                                     schema_, 
ManifestContent::kData);
+        break;
+    }
+
+    EXPECT_THAT(writer_result, IsOk());
+    auto writer = std::move(writer_result.value());
+
+    for (const auto& entry : entries) {
+      EXPECT_THAT(writer->WriteEntry(entry), IsOk());
+    }
+
+    EXPECT_THAT(writer->Close(), IsOk());
+    auto manifest_result = writer->ToManifestFile();
+    EXPECT_THAT(manifest_result, IsOk());
+    return std::move(manifest_result.value());
+  }
+
+  ManifestEntry MakeEntry(ManifestStatus status, int64_t snapshot_id,
+                          std::unique_ptr<DataFile> file) {
+    return ManifestEntry{
+        .status = status,
+        .snapshot_id = snapshot_id,
+        .sequence_number =
+            (status == ManifestStatus::kAdded) ? std::nullopt : 
std::make_optional(0),
+        .file_sequence_number = std::nullopt,
+        .data_file = std::move(file),
+    };
+  }
+
+  std::unique_ptr<DataFile> MakeDeleteFile(
+      const std::string& path, const PartitionValues& partition,
+      DataFile::Content content,
+      std::optional<std::string> referenced_file = std::nullopt,
+      std::optional<int64_t> content_offset = std::nullopt,
+      std::optional<int64_t> content_size = std::nullopt) {
+    return std::make_unique<DataFile>(DataFile{
+        .content = content,
+        .file_path = path,
+        .file_format = FileFormatType::kParquet,
+        .partition = partition,
+        .record_count = 1,
+        .file_size_in_bytes = 10,
+        .equality_ids = (content == DataFile::Content::kEqualityDeletes)
+                            ? std::vector<int>{3}
+                            : std::vector<int>{},
+        .referenced_data_file = referenced_file,
+        .content_offset = content_offset,
+        .content_size_in_bytes = content_size,
+    });
+  }
+
+  ManifestFile WriteDeleteManifest(int format_version, int64_t snapshot_id,
+                                   std::vector<ManifestEntry> entries) {
+    const std::string manifest_path = MakeManifestPath();
+
+    Result<std::unique_ptr<ManifestWriter>> writer_result =
+        NotSupported("Format version: {}", format_version);
+
+    if (format_version == 2) {
+      writer_result =
+          ManifestWriter::MakeV2Writer(snapshot_id, manifest_path, file_io_, 
spec_,
+                                       schema_, ManifestContent::kDeletes);
+    } else if (format_version == 3) {
+      writer_result = ManifestWriter::MakeV3Writer(
+          snapshot_id, /*first_row_id=*/std::nullopt, manifest_path, file_io_, 
spec_,
+          schema_, ManifestContent::kDeletes);
+    }
+
+    EXPECT_THAT(writer_result, IsOk());
+    auto writer = std::move(writer_result.value());
+
+    for (const auto& entry : entries) {
+      EXPECT_THAT(writer->WriteEntry(entry), IsOk());
+    }
+
+    EXPECT_THAT(writer->Close(), IsOk());
+    auto manifest_result = writer->ToManifestFile();
+    EXPECT_THAT(manifest_result, IsOk());
+    return std::move(manifest_result.value());
+  }
+
+  std::shared_ptr<FileIO> file_io_;
+  std::shared_ptr<Schema> schema_;
+  std::shared_ptr<PartitionSpec> spec_;
+};
+
+TEST_P(TestManifestReader, TestManifestReaderWithEmptyInheritableMetadata) {
+  int version = GetParam();
+  auto file_a =
+      MakeDataFile("/path/to/data-a.parquet", 
PartitionValues({Literal::Int(0)}));
+
+  auto entry =
+      MakeEntry(ManifestStatus::kExisting, /*snapshot_id=*/1000L, 
std::move(file_a));
+  entry.sequence_number = 0;
+  entry.file_sequence_number = 0;
+
+  std::vector<ManifestEntry> entries;
+  entries.push_back(std::move(entry));
+  auto manifest = WriteManifest(version, /*snapshot_id=*/1000L, 
std::move(entries));
+
+  auto reader_result = ManifestReader::Make(manifest, file_io_, schema_, 
spec_);
+  ASSERT_THAT(reader_result, IsOk());
+  auto reader = std::move(reader_result.value());
+
+  auto entries_result = reader->Entries();
+  ASSERT_THAT(entries_result, IsOk());
+  auto read_entries = entries_result.value();
+
+  ASSERT_EQ(read_entries.size(), 1);
+  const auto& read_entry = read_entries[0];
+  EXPECT_EQ(read_entry.status, ManifestStatus::kExisting);
+  EXPECT_EQ(read_entry.data_file->file_path, "/path/to/data-a.parquet");
+  EXPECT_EQ(read_entry.snapshot_id, 1000L);
+}
+
+TEST_P(TestManifestReader, TestReaderWithFilterWithoutSelect) {
+  int version = GetParam();
+  auto file_a =
+      MakeDataFile("/path/to/data-a.parquet", 
PartitionValues({Literal::Int(0)}));
+  auto file_b =
+      MakeDataFile("/path/to/data-b.parquet", 
PartitionValues({Literal::Int(1)}));
+  auto file_c =
+      MakeDataFile("/path/to/data-c.parquet", 
PartitionValues({Literal::Int(2)}));
+
+  std::vector<ManifestEntry> entries;
+  entries.push_back(MakeEntry(ManifestStatus::kAdded, 1000L, 
std::move(file_a)));
+  entries.push_back(MakeEntry(ManifestStatus::kAdded, 1000L, 
std::move(file_b)));
+  entries.push_back(MakeEntry(ManifestStatus::kAdded, 1000L, 
std::move(file_c)));
+
+  auto manifest = WriteManifest(version, 1000L, std::move(entries));
+
+  auto reader_result = ManifestReader::Make(manifest, file_io_, schema_, 
spec_);
+  ASSERT_THAT(reader_result, IsOk());
+  auto reader = std::move(reader_result.value());
+
+  reader->FilterRows(Expressions::Equal("id", Literal::Int(0)));
+
+  auto result_entries = reader->Entries();
+  ASSERT_THAT(result_entries, IsOk());
+  auto read_entries = result_entries.value();
+
+  // note that all files are returned because the reader returns data files 
that may
+  // match, and the partition is bucketing by data, which doesn't help filter 
files
+  ASSERT_EQ(read_entries.size(), 3);
+  EXPECT_EQ(read_entries[0].data_file->file_path, "/path/to/data-a.parquet");
+  EXPECT_EQ(read_entries[1].data_file->file_path, "/path/to/data-b.parquet");
+  EXPECT_EQ(read_entries[2].data_file->file_path, "/path/to/data-c.parquet");
+}
+
+TEST_P(TestManifestReader, TestManifestReaderWithPartitionMetadata) {
+  int version = GetParam();
+  auto file_a =
+      MakeDataFile("/path/to/data-a.parquet", 
PartitionValues({Literal::Int(0)}));
+  auto entry =
+      MakeEntry(ManifestStatus::kExisting, /*snapshot_id=*/123L, 
std::move(file_a));
+  entry.sequence_number = 0;
+  entry.file_sequence_number = 0;
+
+  std::vector<ManifestEntry> entries;
+  entries.push_back(std::move(entry));
+  auto manifest = WriteManifest(version, /*snapshot_id=*/1000L, 
std::move(entries));
+
+  auto reader_result = ManifestReader::Make(manifest, file_io_, schema_, 
spec_);
+  ASSERT_THAT(reader_result, IsOk());
+  auto reader = std::move(reader_result.value());
+
+  auto entries_result = reader->Entries();
+  ASSERT_THAT(entries_result, IsOk());
+  auto read_entries = entries_result.value();
+
+  ASSERT_EQ(read_entries.size(), 1);
+  const auto& read_entry = read_entries[0];
+  EXPECT_EQ(read_entry.snapshot_id, 123L);
+
+  ASSERT_EQ(read_entry.data_file->partition.num_fields(), 1);
+  EXPECT_EQ(read_entry.data_file->partition.values()[0], Literal::Int(0));
+}
+
+TEST_P(TestManifestReader, TestDeleteFilesWithReferences) {
+  int version = GetParam();
+  if (version < 2) {
+    GTEST_SKIP() << "Delete files only supported in V2+";
+  }
+
+  auto delete_file_1 = MakeDeleteFile(
+      "/path/to/data-a-deletes.parquet", PartitionValues({Literal::Int(0)}),
+      DataFile::Content::kPositionDeletes, "/path/to/data-a.parquet");
+  auto delete_file_2 = MakeDeleteFile(
+      "/path/to/data-b-deletes.parquet", PartitionValues({Literal::Int(1)}),
+      DataFile::Content::kPositionDeletes, "/path/to/data-b.parquet");
+
+  std::vector<ManifestEntry> entries;
+  entries.push_back(
+      MakeEntry(ManifestStatus::kAdded, /*snapshot_id=*/1000L, 
std::move(delete_file_1)));
+  entries.push_back(
+      MakeEntry(ManifestStatus::kAdded, /*snapshot_id=*/1000L, 
std::move(delete_file_2)));
+
+  auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, 
std::move(entries));
+
+  auto reader_result = ManifestReader::Make(manifest, file_io_, schema_, 
spec_);
+  ASSERT_THAT(reader_result, IsOk());
+  auto reader = std::move(reader_result.value());
+
+  auto entries_result = reader->Entries();
+  ASSERT_THAT(entries_result, IsOk());
+  auto read_entries = entries_result.value();
+  ASSERT_EQ(read_entries.size(), 2);
+
+  for (const auto& entry : read_entries) {
+    if (entry.data_file->file_path == "/path/to/data-a-deletes.parquet") {
+      EXPECT_EQ(entry.data_file->referenced_data_file, 
"/path/to/data-a.parquet");
+    } else {
+      EXPECT_EQ(entry.data_file->referenced_data_file, 
"/path/to/data-b.parquet");
+    }
+  }
+}
+
+TEST_P(TestManifestReader, TestDVs) {
+  int version = GetParam();
+  if (version < 3) {
+    GTEST_SKIP() << "DVs only supported in V3+";
+  }
+
+  auto dv1 =
+      MakeDeleteFile("/path/to/data-a-deletes.puffin", 
PartitionValues({Literal::Int(0)}),
+                     DataFile::Content::kPositionDeletes, 
"/path/to/data-a.parquet",
+                     /*content_offset=*/4L, /*content_size_in_bytes=*/6L);
+  auto dv2 =
+      MakeDeleteFile("/path/to/data-b-deletes.puffin", 
PartitionValues({Literal::Int(1)}),
+                     DataFile::Content::kPositionDeletes, 
"/path/to/data-b.parquet",
+                     /*content_offset=*/4L, /*content_size_in_bytes=*/6L);
+
+  std::vector<ManifestEntry> entries;
+  entries.push_back(
+      MakeEntry(ManifestStatus::kAdded, /*snapshot_id=*/1000L, 
std::move(dv1)));
+  entries.push_back(
+      MakeEntry(ManifestStatus::kAdded, /*snapshot_id=*/1000L, 
std::move(dv2)));
+  auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, 
std::move(entries));
+
+  auto reader_result = ManifestReader::Make(manifest, file_io_, schema_, 
spec_);
+  ASSERT_THAT(reader_result, IsOk());
+  auto reader = std::move(reader_result.value());
+
+  auto entries_result = reader->Entries();
+  ASSERT_THAT(entries_result, IsOk());
+  auto read_entries = entries_result.value();
+  ASSERT_EQ(read_entries.size(), 2);
+
+  for (const auto& entry : read_entries) {
+    if (entry.data_file->file_path == "/path/to/data-a-deletes.puffin") {
+      EXPECT_EQ(entry.data_file->referenced_data_file, 
"/path/to/data-a.parquet");
+      EXPECT_EQ(entry.data_file->content_offset, 4L);
+      EXPECT_EQ(entry.data_file->content_size_in_bytes, 6L);
+    } else {
+      EXPECT_EQ(entry.data_file->referenced_data_file, 
"/path/to/data-b.parquet");
+      EXPECT_EQ(entry.data_file->content_offset, 4L);
+      EXPECT_EQ(entry.data_file->content_size_in_bytes, 6L);
+    }
+  }
+}
+
+TEST_P(TestManifestReader, TestInvalidUsage) {
+  int version = GetParam();
+  auto file_a =
+      MakeDataFile("/path/to/data-a.parquet", 
PartitionValues({Literal::Int(0)}));
+  auto entry =
+      MakeEntry(ManifestStatus::kExisting, /*snapshot_id=*/1000L, 
std::move(file_a));
+  entry.sequence_number = 0;
+  entry.file_sequence_number = 0;
+
+  auto manifest =
+      WriteManifest(version, /*snapshot_id=*/std::nullopt, {std::move(entry)});
+
+  auto reader_result = ManifestReader::Make(manifest, file_io_, schema_, 
spec_);
+  EXPECT_THAT(reader_result, IsError(ErrorKind::kInvalidManifest));
+  EXPECT_THAT(reader_result, HasErrorMessage("has no snapshot ID"));
+}
+
+INSTANTIATE_TEST_SUITE_P(ManifestReaderVersions, TestManifestReader,
+                         testing::Values(1, 2, 3));
+
+}  // namespace iceberg
diff --git a/src/iceberg/test/manifest_writer_versions_test.cc 
b/src/iceberg/test/manifest_writer_versions_test.cc
index 36f9445f..e3229fc7 100644
--- a/src/iceberg/test/manifest_writer_versions_test.cc
+++ b/src/iceberg/test/manifest_writer_versions_test.cc
@@ -241,10 +241,7 @@ class ManifestWriterVersionsTest : public ::testing::Test {
   }
 
   std::vector<ManifestEntry> ReadManifest(const ManifestFile& manifest_file) {
-    auto partition_type_result = spec_->PartitionType(*schema_);
-    EXPECT_THAT(partition_type_result, IsOk());
-    std::shared_ptr<StructType> partition_type = 
std::move(partition_type_result.value());
-    auto reader_result = ManifestReader::Make(manifest_file, file_io_, 
partition_type);
+    auto reader_result = ManifestReader::Make(manifest_file, file_io_, 
schema_, spec_);
     EXPECT_THAT(reader_result, IsOk());
 
     auto reader = std::move(reader_result.value());

Reply via email to