This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new c142ef9 feat: add partition summary and write added/existing/deleted
entries to manifest writer (#317)
c142ef9 is described below
commit c142ef9e81ef746e42ac88ffc3a6cc7b54009f67
Author: Junwang Zhao <[email protected]>
AuthorDate: Wed Nov 26 10:12:30 2025 +0800
feat: add partition summary and write added/existing/deleted entries to
manifest writer (#317)
---
.gitignore | 1 +
src/iceberg/CMakeLists.txt | 1 +
src/iceberg/avro/avro_writer.cc | 8 +-
src/iceberg/avro/avro_writer.h | 4 +-
src/iceberg/expression/literal.h | 4 +-
src/iceberg/file_writer.h | 4 +-
src/iceberg/manifest_adapter.cc | 12 +-
src/iceberg/manifest_adapter.h | 12 +-
src/iceberg/manifest_entry.cc | 1 -
src/iceberg/manifest_entry.h | 50 +-
src/iceberg/manifest_list.h | 32 +-
src/iceberg/manifest_reader.cc | 6 +-
src/iceberg/manifest_reader_internal.cc | 48 +-
src/iceberg/manifest_reader_internal.h | 7 +-
src/iceberg/manifest_writer.cc | 207 ++++++-
src/iceberg/manifest_writer.h | 86 ++-
src/iceberg/meson.build | 1 +
src/iceberg/metrics.h | 15 +-
src/iceberg/parquet/parquet_writer.cc | 8 +-
src/iceberg/parquet/parquet_writer.h | 4 +-
src/iceberg/partition_summary.cc | 103 ++++
src/iceberg/partition_summary_internal.h | 70 +++
src/iceberg/test/CMakeLists.txt | 1 +
.../test/manifest_list_reader_writer_test.cc | 6 +-
src/iceberg/test/manifest_list_versions_test.cc | 16 +-
src/iceberg/test/manifest_reader_writer_test.cc | 21 +-
src/iceberg/test/manifest_writer_versions_test.cc | 668 +++++++++++++++++++++
src/iceberg/test/parquet_test.cc | 13 +-
src/iceberg/test/partition_field_test.cc | 1 -
src/iceberg/test/partition_spec_test.cc | 87 ++-
src/iceberg/test/struct_like_test.cc | 10 +-
src/iceberg/type_fwd.h | 2 +
src/iceberg/v1_metadata.cc | 12 +-
src/iceberg/v1_metadata.h | 3 -
src/iceberg/v2_metadata.cc | 9 +-
src/iceberg/v2_metadata.h | 3 -
src/iceberg/v3_metadata.cc | 29 +-
src/iceberg/v3_metadata.h | 4 +-
38 files changed, 1372 insertions(+), 197 deletions(-)
diff --git a/.gitignore b/.gitignore
index 9e79c86..f5d4706 100644
--- a/.gitignore
+++ b/.gitignore
@@ -16,6 +16,7 @@
# under the License.
build/
+cmake-build/
cmake-build-debug/
cmake-build-release/
.DS_Store
diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index dd78dc6..305e315 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -42,6 +42,7 @@ set(ICEBERG_SOURCES
name_mapping.cc
partition_field.cc
partition_spec.cc
+ partition_summary.cc
row/arrow_array_wrapper.cc
row/manifest_wrapper.cc
row/struct_like.cc
diff --git a/src/iceberg/avro/avro_writer.cc b/src/iceberg/avro/avro_writer.cc
index 9c8d58d..9fec43a 100644
--- a/src/iceberg/avro/avro_writer.cc
+++ b/src/iceberg/avro/avro_writer.cc
@@ -154,19 +154,19 @@ Status AvroWriter::Close() {
return {};
}
-std::optional<Metrics> AvroWriter::metrics() {
+Result<Metrics> AvroWriter::metrics() {
if (impl_->Closed()) {
// TODO(xiao.dong) implement metrics
return {};
}
- return std::nullopt;
+ return Invalid("AvroWriter is not closed");
}
-std::optional<int64_t> AvroWriter::length() {
+Result<int64_t> AvroWriter::length() {
if (impl_->Closed()) {
return impl_->length();
}
- return std::nullopt;
+ return Invalid("AvroWriter is not closed");
}
std::vector<int64_t> AvroWriter::split_offsets() { return {}; }
diff --git a/src/iceberg/avro/avro_writer.h b/src/iceberg/avro/avro_writer.h
index 0a5dd0b..3bd8110 100644
--- a/src/iceberg/avro/avro_writer.h
+++ b/src/iceberg/avro/avro_writer.h
@@ -37,9 +37,9 @@ class ICEBERG_BUNDLE_EXPORT AvroWriter : public Writer {
Status Write(ArrowArray* data) final;
- std::optional<Metrics> metrics() final;
+ Result<Metrics> metrics() final;
- std::optional<int64_t> length() final;
+ Result<int64_t> length() final;
std::vector<int64_t> split_offsets() final;
diff --git a/src/iceberg/expression/literal.h b/src/iceberg/expression/literal.h
index 42c964f..b07aaa5 100644
--- a/src/iceberg/expression/literal.h
+++ b/src/iceberg/expression/literal.h
@@ -150,11 +150,11 @@ class ICEBERG_EXPORT Literal : public util::Formattable {
/// \return true if this literal represents a BelowMin value, false otherwise
bool IsBelowMin() const;
- /// Check if this literal is null.
+ /// \brief Check if this literal is null.
/// \return true if this literal is null, false otherwise
bool IsNull() const;
- /// Check if this literal is NaN.
+ /// \brief Check if this literal is NaN.
/// \return true if this literal is NaN, false otherwise
bool IsNaN() const;
diff --git a/src/iceberg/file_writer.h b/src/iceberg/file_writer.h
index ea4e423..ae65075 100644
--- a/src/iceberg/file_writer.h
+++ b/src/iceberg/file_writer.h
@@ -97,11 +97,11 @@ class ICEBERG_EXPORT Writer {
/// \brief Get the file statistics.
/// Only valid after the file is closed.
- virtual std::optional<Metrics> metrics() = 0;
+ virtual Result<Metrics> metrics() = 0;
/// \brief Get the file length.
/// Only valid after the file is closed.
- virtual std::optional<int64_t> length() = 0;
+ virtual Result<int64_t> length() = 0;
/// \brief Returns a list of recommended split locations, if applicable,
empty
/// otherwise. When available, this information is used for planning scan
tasks whose
diff --git a/src/iceberg/manifest_adapter.cc b/src/iceberg/manifest_adapter.cc
index 76ed0ec..fa2efd4 100644
--- a/src/iceberg/manifest_adapter.cc
+++ b/src/iceberg/manifest_adapter.cc
@@ -19,6 +19,7 @@
#include "iceberg/manifest_adapter.h"
+#include <memory>
#include <utility>
#include <nanoarrow/nanoarrow.h>
@@ -28,7 +29,6 @@
#include "iceberg/manifest_list.h"
#include "iceberg/result.h"
#include "iceberg/schema.h"
-#include "iceberg/schema_internal.h"
#include "iceberg/util/checked_cast.h"
#include "iceberg/util/macros.h"
@@ -141,10 +141,12 @@ Result<ArrowArray*> ManifestAdapter::FinishAppending() {
return &array_;
}
-ManifestEntryAdapter::ManifestEntryAdapter(std::shared_ptr<PartitionSpec>
partition_spec,
+ManifestEntryAdapter::ManifestEntryAdapter(std::optional<int64_t> snapshot_id_,
+ std::shared_ptr<PartitionSpec>
partition_spec,
std::shared_ptr<Schema>
current_schema,
ManifestContent content)
- : partition_spec_(std::move(partition_spec)),
+ : snapshot_id_(snapshot_id_),
+ partition_spec_(std::move(partition_spec)),
current_schema_(std::move(current_schema)),
content_(content) {
if (!partition_spec_) {
@@ -386,6 +388,10 @@ Result<std::optional<int64_t>>
ManifestEntryAdapter::GetContentSizeInBytes(
}
Status ManifestEntryAdapter::AppendInternal(const ManifestEntry& entry) {
+ if (entry.data_file == nullptr) [[unlikely]] {
+ return InvalidManifest("Missing required data_file field from manifest
entry.");
+ }
+
const auto& fields = manifest_schema_->fields();
for (size_t i = 0; i < fields.size(); i++) {
const auto& field = fields[i];
diff --git a/src/iceberg/manifest_adapter.h b/src/iceberg/manifest_adapter.h
index 269a73b..979d81a 100644
--- a/src/iceberg/manifest_adapter.h
+++ b/src/iceberg/manifest_adapter.h
@@ -25,7 +25,6 @@
#include <memory>
#include <optional>
#include <unordered_map>
-#include <unordered_set>
#include <vector>
#include "iceberg/arrow_c_data.h"
@@ -61,7 +60,8 @@ class ICEBERG_EXPORT ManifestAdapter {
/// Implemented by different versions with version-specific schemas.
class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
public:
- ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec,
+ ManifestEntryAdapter(std::optional<int64_t> snapshot_id_,
+ std::shared_ptr<PartitionSpec> partition_spec,
std::shared_ptr<Schema> current_schema, ManifestContent
content);
~ManifestEntryAdapter() override;
@@ -72,6 +72,12 @@ class ICEBERG_EXPORT ManifestEntryAdapter : public
ManifestAdapter {
ManifestContent content() const { return content_; }
+ std::optional<int64_t> snapshot_id() const { return snapshot_id_; }
+
+ const std::shared_ptr<PartitionSpec>& partition_spec() const { return
partition_spec_; }
+
+ const std::shared_ptr<StructType>& partition_type() const { return
partition_type_; }
+
protected:
Status AppendInternal(const ManifestEntry& entry);
Status AppendDataFile(ArrowArray* array,
@@ -91,8 +97,10 @@ class ICEBERG_EXPORT ManifestEntryAdapter : public
ManifestAdapter {
const DataFile& file) const;
protected:
+ std::optional<int64_t> snapshot_id_;
std::shared_ptr<PartitionSpec> partition_spec_;
std::shared_ptr<Schema> current_schema_;
+ std::shared_ptr<StructType> partition_type_;
std::shared_ptr<Schema> manifest_schema_;
const ManifestContent content_;
};
diff --git a/src/iceberg/manifest_entry.cc b/src/iceberg/manifest_entry.cc
index fee845d..cf8ec07 100644
--- a/src/iceberg/manifest_entry.cc
+++ b/src/iceberg/manifest_entry.cc
@@ -22,7 +22,6 @@
#include <memory>
#include <vector>
-#include "iceberg/schema.h"
#include "iceberg/schema_field.h"
#include "iceberg/type.h"
diff --git a/src/iceberg/manifest_entry.h b/src/iceberg/manifest_entry.h
index c67a63e..7225f4a 100644
--- a/src/iceberg/manifest_entry.h
+++ b/src/iceberg/manifest_entry.h
@@ -57,15 +57,6 @@ ICEBERG_EXPORT constexpr Result<ManifestStatus>
ManifestStatusFromInt(
}
}
-enum class ManifestContent {
- kData = 0,
- kDeletes = 1,
-};
-
-ICEBERG_EXPORT constexpr std::string_view ToString(ManifestContent content)
noexcept;
-ICEBERG_EXPORT constexpr Result<ManifestContent> ManifestContentFromString(
- std::string_view str) noexcept;
-
/// \brief DataFile carries data file path, partition tuple, metrics, ...
struct ICEBERG_EXPORT DataFile {
/// \brief Content of a data file
@@ -277,6 +268,7 @@ struct ICEBERG_EXPORT DataFile {
bool operator==(const DataFile& other) const = default;
+ /// \brief Get the schema of the data file with the given partition type.
static std::shared_ptr<StructType> Type(std::shared_ptr<StructType>
partition_type);
};
@@ -315,6 +307,33 @@ struct ICEBERG_EXPORT ManifestEntry {
inline static const SchemaField kFileSequenceNumber =
SchemaField::MakeOptional(4, "file_sequence_number", iceberg::int64());
+ /// \brief Check if this manifest entry is deleted.
+ constexpr bool IsAlive() const {
+ return status == ManifestStatus::kAdded || status ==
ManifestStatus::kExisting;
+ }
+
+ ManifestEntry AsAdded() const {
+ ManifestEntry copy = *this;
+ copy.status = ManifestStatus::kAdded;
+ if (copy.data_file->first_row_id.has_value()) {
+ copy.data_file = std::make_unique<DataFile>(*copy.data_file);
+ copy.data_file->first_row_id = std::nullopt;
+ }
+ return copy;
+ }
+
+ ManifestEntry AsExisting() const {
+ ManifestEntry copy = *this;
+ copy.status = ManifestStatus::kExisting;
+ return copy;
+ }
+
+ ManifestEntry AsDeleted() const {
+ ManifestEntry copy = *this;
+ copy.status = ManifestStatus::kDeleted;
+ return copy;
+ }
+
bool operator==(const ManifestEntry& other) const;
static std::shared_ptr<StructType> TypeFromPartitionType(
@@ -323,6 +342,19 @@ struct ICEBERG_EXPORT ManifestEntry {
std::shared_ptr<StructType> datafile_type);
};
+/// \brief Get the relative datafile content type name
+ICEBERG_EXPORT constexpr std::string_view ToString(DataFile::Content type)
noexcept {
+ switch (type) {
+ case DataFile::Content::kData:
+ return "data";
+ case DataFile::Content::kPositionDeletes:
+ return "position_deletes";
+ case DataFile::Content::kEqualityDeletes:
+ return "equality_deletes";
+ }
+ std::unreachable();
+}
+
/// \brief Get the relative data file content type from int
ICEBERG_EXPORT constexpr Result<DataFile::Content> DataFileContentFromInt(
int content) noexcept {
diff --git a/src/iceberg/manifest_list.h b/src/iceberg/manifest_list.h
index 17dc28c..fe1b246 100644
--- a/src/iceberg/manifest_list.h
+++ b/src/iceberg/manifest_list.h
@@ -72,17 +72,17 @@ struct ICEBERG_EXPORT PartitionFieldSummary {
static const StructType& Type();
};
+/// \brief The type of files tracked by the manifest, either data or delete
files; 0 for
+/// all v1 manifests
+enum class ManifestContent {
+ /// The manifest content is data.
+ kData = 0,
+ /// The manifest content is deletes.
+ kDeletes = 1,
+};
+
/// \brief Entry in a manifest list.
struct ICEBERG_EXPORT ManifestFile {
- /// \brief The type of files tracked by the manifest, either data or delete
files; 0 for
- /// all v1 manifests
- enum class Content {
- /// The manifest content is data.
- kData = 0,
- /// The manifest content is deletes.
- kDeletes = 1,
- };
-
/// Field id: 500
/// Location of the manifest file
std::string manifest_path;
@@ -96,7 +96,7 @@ struct ICEBERG_EXPORT ManifestFile {
/// Field id: 517
/// The type of files tracked by the manifest, either data or delete files;
0 for all v1
/// manifests
- Content content = Content::kData;
+ ManifestContent content = ManifestContent::kData;
/// Field id: 515
/// The sequence number when the manifest was added to the table; use 0 when
reading v1
/// manifest lists
@@ -218,21 +218,21 @@ struct ICEBERG_EXPORT ManifestList {
};
/// \brief Get the relative manifest content type name
-ICEBERG_EXPORT constexpr std::string_view ToString(ManifestFile::Content type)
noexcept {
+ICEBERG_EXPORT inline constexpr std::string_view ToString(ManifestContent
type) noexcept {
switch (type) {
- case ManifestFile::Content::kData:
+ case ManifestContent::kData:
return "data";
- case ManifestFile::Content::kDeletes:
+ case ManifestContent::kDeletes:
return "deletes";
}
std::unreachable();
}
/// \brief Get the relative manifest content type from name
-ICEBERG_EXPORT constexpr Result<ManifestFile::Content>
ManifestFileContentFromString(
+ICEBERG_EXPORT inline constexpr Result<ManifestContent>
ManifestContentFromString(
std::string_view str) noexcept {
- if (str == "data") return ManifestFile::Content::kData;
- if (str == "deletes") return ManifestFile::Content::kDeletes;
+ if (str == "data") return ManifestContent::kData;
+ if (str == "deletes") return ManifestContent::kDeletes;
return InvalidArgument("Invalid manifest content type: {}", str);
}
diff --git a/src/iceberg/manifest_reader.cc b/src/iceberg/manifest_reader.cc
index 805bbb7..94532c6 100644
--- a/src/iceberg/manifest_reader.cc
+++ b/src/iceberg/manifest_reader.cc
@@ -48,7 +48,8 @@ Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(
InheritableMetadataFactory::FromManifest(manifest));
return std::make_unique<ManifestReaderImpl>(std::move(reader),
std::move(schema),
- std::move(inheritable_metadata));
+ std::move(inheritable_metadata),
+ manifest.first_row_id);
}
Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(
@@ -66,7 +67,8 @@ Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(
.projection = schema}));
ICEBERG_ASSIGN_OR_RAISE(auto inheritable_metadata,
InheritableMetadataFactory::Empty());
return std::make_unique<ManifestReaderImpl>(std::move(reader),
std::move(schema),
- std::move(inheritable_metadata));
+ std::move(inheritable_metadata),
+ std::nullopt);
}
Result<std::unique_ptr<ManifestListReader>> ManifestListReader::Make(
diff --git a/src/iceberg/manifest_reader_internal.cc
b/src/iceberg/manifest_reader_internal.cc
index 346f19a..b6007d1 100644
--- a/src/iceberg/manifest_reader_internal.cc
+++ b/src/iceberg/manifest_reader_internal.cc
@@ -237,7 +237,7 @@ Result<std::vector<ManifestFile>>
ParseManifestList(ArrowSchema* schema,
break;
case ManifestFileField::kContent:
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].content, view_of_column,
- ManifestFile::Content);
+ ManifestContent);
break;
case ManifestFileField::kSequenceNumber:
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].sequence_number,
view_of_column,
@@ -328,7 +328,7 @@ Status ParseLiteral(ArrowArrayView* view_of_partition,
int64_t row_idx,
}
Status ParseDataFile(const std::shared_ptr<StructType>& data_file_schema,
- ArrowArrayView* view_of_column,
+ ArrowArrayView* view_of_column, std::optional<int64_t>&
first_row_id,
std::vector<ManifestEntry>& manifest_entries) {
if (view_of_column->storage_type != ArrowType::NANOARROW_TYPE_STRUCT) {
return InvalidManifest("DataFile field should be a struct.");
@@ -367,13 +367,16 @@ Status ParseDataFile(const std::shared_ptr<StructType>&
data_file_schema,
return InvalidManifest("Field:{} should be a struct.", field_name);
}
if (view_of_file_field->n_children > 0) {
- auto view_of_partition = view_of_file_field->children[0];
- for (int64_t row_idx = 0; row_idx < view_of_partition->length;
row_idx++) {
- if (ArrowArrayViewIsNull(view_of_partition, row_idx)) {
- break;
+ for (int64_t partition_idx = 0; partition_idx <
view_of_file_field->n_children;
+ partition_idx++) {
+ auto view_of_partition =
view_of_file_field->children[partition_idx];
+ for (int64_t row_idx = 0; row_idx < view_of_partition->length;
row_idx++) {
+ if (ArrowArrayViewIsNull(view_of_partition, row_idx)) {
+ break;
+ }
+ ICEBERG_RETURN_UNEXPECTED(
+ ParseLiteral(view_of_partition, row_idx, manifest_entries));
}
- ICEBERG_RETURN_UNEXPECTED(
- ParseLiteral(view_of_partition, row_idx, manifest_entries));
}
}
} break;
@@ -429,10 +432,25 @@ Status ParseDataFile(const std::shared_ptr<StructType>&
data_file_schema,
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->sort_order_id,
view_of_file_field, int32_t);
break;
- case 16:
+ case 16: {
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->first_row_id,
view_of_file_field, int64_t);
+ if (first_row_id.has_value()) {
+ std::ranges::for_each(manifest_entries,
[&first_row_id](ManifestEntry& entry) {
+ if (entry.status != ManifestStatus::kDeleted &&
+ !entry.data_file->first_row_id.has_value()) {
+ entry.data_file->first_row_id = first_row_id.value();
+ first_row_id = first_row_id.value() +
entry.data_file->record_count;
+ }
+ });
+ } else {
+ // data file's first_row_id is null when the manifest's first_row_id
is null
+ std::ranges::for_each(manifest_entries, [](ManifestEntry& entry) {
+ entry.data_file->first_row_id = std::nullopt;
+ });
+ }
break;
+ }
case 17:
PARSE_STRING_FIELD(manifest_entries[row_idx].data_file->referenced_data_file,
view_of_file_field);
@@ -452,9 +470,9 @@ Status ParseDataFile(const std::shared_ptr<StructType>&
data_file_schema,
return {};
}
-Result<std::vector<ManifestEntry>> ParseManifestEntry(ArrowSchema* schema,
- ArrowArray* array_in,
- const Schema&
iceberg_schema) {
+Result<std::vector<ManifestEntry>> ParseManifestEntry(
+ ArrowSchema* schema, ArrowArray* array_in, const Schema& iceberg_schema,
+ std::optional<int64_t>& first_row_id) {
if (schema->n_children != array_in->n_children) {
return InvalidManifest("Columns size not match between schema:{} and
array:{}",
schema->n_children, array_in->n_children);
@@ -509,8 +527,8 @@ Result<std::vector<ManifestEntry>>
ParseManifestEntry(ArrowSchema* schema,
case 4: {
auto data_file_schema =
internal::checked_pointer_cast<StructType>(field.value()->get().type());
- ICEBERG_RETURN_UNEXPECTED(
- ParseDataFile(data_file_schema, view_of_column, manifest_entries));
+ ICEBERG_RETURN_UNEXPECTED(ParseDataFile(data_file_schema,
view_of_column,
+ first_row_id,
manifest_entries));
break;
}
default:
@@ -530,7 +548,7 @@ Result<std::vector<ManifestEntry>>
ManifestReaderImpl::Entries() const {
internal::ArrowArrayGuard array_guard(&result.value());
ICEBERG_ASSIGN_OR_RAISE(
auto parse_result,
- ParseManifestEntry(&arrow_schema, &result.value(), *schema_));
+ ParseManifestEntry(&arrow_schema, &result.value(), *schema_,
first_row_id_));
manifest_entries.insert(manifest_entries.end(),
std::make_move_iterator(parse_result.begin()),
std::make_move_iterator(parse_result.end()));
diff --git a/src/iceberg/manifest_reader_internal.h
b/src/iceberg/manifest_reader_internal.h
index e12892e..b65f4cd 100644
--- a/src/iceberg/manifest_reader_internal.h
+++ b/src/iceberg/manifest_reader_internal.h
@@ -33,10 +33,12 @@ class ManifestReaderImpl : public ManifestReader {
public:
explicit ManifestReaderImpl(std::unique_ptr<Reader> reader,
std::shared_ptr<Schema> schema,
- std::unique_ptr<InheritableMetadata>
inheritable_metadata)
+ std::unique_ptr<InheritableMetadata>
inheritable_metadata,
+ std::optional<int64_t> first_row_id)
: schema_(std::move(schema)),
reader_(std::move(reader)),
- inheritable_metadata_(std::move(inheritable_metadata)) {}
+ inheritable_metadata_(std::move(inheritable_metadata)),
+ first_row_id_(first_row_id) {}
Result<std::vector<ManifestEntry>> Entries() const override;
@@ -46,6 +48,7 @@ class ManifestReaderImpl : public ManifestReader {
std::shared_ptr<Schema> schema_;
std::unique_ptr<Reader> reader_;
std::unique_ptr<InheritableMetadata> inheritable_metadata_;
+ mutable std::optional<int64_t> first_row_id_;
};
/// \brief Read manifest files from a manifest list file.
diff --git a/src/iceberg/manifest_writer.cc b/src/iceberg/manifest_writer.cc
index fd730f2..3209a0c 100644
--- a/src/iceberg/manifest_writer.cc
+++ b/src/iceberg/manifest_writer.cc
@@ -19,9 +19,14 @@
#include "iceberg/manifest_writer.h"
+#include <optional>
+
#include "iceberg/manifest_entry.h"
#include "iceberg/manifest_list.h"
+#include "iceberg/partition_summary_internal.h"
+#include "iceberg/result.h"
#include "iceberg/schema.h"
+#include "iceberg/table_metadata.h"
#include "iceberg/util/macros.h"
#include "iceberg/v1_metadata.h"
#include "iceberg/v2_metadata.h"
@@ -29,18 +34,171 @@
namespace iceberg {
-Status ManifestWriter::Add(const ManifestEntry& entry) {
+ManifestWriter::ManifestWriter(std::unique_ptr<Writer> writer,
+ std::unique_ptr<ManifestEntryAdapter> adapter,
+ std::string_view manifest_location,
+ std::optional<int64_t> first_row_id)
+ : writer_(std::move(writer)),
+ adapter_(std::move(adapter)),
+ manifest_location_(manifest_location),
+ first_row_id_(first_row_id),
+ partition_summary_(
+ std::make_unique<PartitionSummary>(*adapter_->partition_type())) {}
+
+ManifestWriter::~ManifestWriter() = default;
+
+Status ManifestWriter::WriteAddedEntry(std::shared_ptr<DataFile> file,
+ std::optional<int64_t>
data_sequence_number) {
+ if (!file) [[unlikely]] {
+ return InvalidArgument("Data file cannot be null");
+ }
+
+ ManifestEntry added{
+ .status = ManifestStatus::kAdded,
+ .snapshot_id = adapter_->snapshot_id(),
+ .sequence_number = data_sequence_number,
+ .file_sequence_number = std::nullopt,
+ .data_file = std::move(file),
+ };
+
+ // Suppress first_row_id for added entries
+ if (added.data_file->first_row_id.has_value()) {
+ added.data_file = std::make_unique<DataFile>(*added.data_file);
+ added.data_file->first_row_id = std::nullopt;
+ }
+
+ return WriteEntry(added);
+}
+
+Status ManifestWriter::WriteAddedEntry(const ManifestEntry& entry) {
+ // Update the entry status to `Added`
+ auto added = entry.AsAdded();
+ added.snapshot_id = adapter_->snapshot_id();
+ // Set the sequence number to nullopt if it is invalid (smaller than 0)
+ if (added.sequence_number.has_value() &&
+ added.sequence_number.value() < TableMetadata::kInitialSequenceNumber) {
+ added.sequence_number = std::nullopt;
+ }
+ added.file_sequence_number = std::nullopt;
+
+ return WriteEntry(added);
+}
+
+Status ManifestWriter::WriteExistingEntry(std::shared_ptr<DataFile> file,
+ int64_t file_snapshot_id,
+ int64_t data_sequence_number,
+ std::optional<int64_t>
file_sequence_number) {
+ if (!file) [[unlikely]] {
+ return InvalidArgument("Data file cannot be null");
+ }
+
+ ManifestEntry existing;
+ existing.status = ManifestStatus::kExisting;
+ existing.snapshot_id = file_snapshot_id;
+ existing.data_file = std::move(file);
+ existing.sequence_number = data_sequence_number;
+ existing.file_sequence_number = file_sequence_number;
+
+ return WriteEntry(existing);
+}
+
+Status ManifestWriter::WriteExistingEntry(const ManifestEntry& entry) {
+ // Update the entry status to `Existing`
+ return WriteEntry(entry.AsExisting());
+}
+
+Status ManifestWriter::WriteDeletedEntry(std::shared_ptr<DataFile> file,
+ int64_t data_sequence_number,
+ std::optional<int64_t>
file_sequence_number) {
+ if (!file) [[unlikely]] {
+ return InvalidArgument("Data file cannot be null");
+ }
+
+ ManifestEntry deleted;
+ deleted.status = ManifestStatus::kDeleted;
+ deleted.snapshot_id = adapter_->snapshot_id();
+ deleted.data_file = std::move(file);
+ deleted.sequence_number = data_sequence_number;
+ deleted.file_sequence_number = file_sequence_number;
+
+ return WriteEntry(deleted);
+}
+
+Status ManifestWriter::WriteDeletedEntry(const ManifestEntry& entry) {
+ // Update the entry status to `Deleted`
+ auto deleted = entry.AsDeleted();
+ // Set the snapshot id to the current snapshot id
+ deleted.snapshot_id = adapter_->snapshot_id();
+
+ return WriteEntry(deleted);
+}
+
+Status ManifestWriter::WriteEntry(const ManifestEntry& entry) {
+ if (!entry.data_file) [[unlikely]] {
+ return InvalidArgument("Data file cannot be null");
+ }
+
+ ICEBERG_RETURN_UNEXPECTED(CheckDataFile(*entry.data_file));
if (adapter_->size() >= kBatchSize) {
ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending());
ICEBERG_RETURN_UNEXPECTED(writer_->Write(array));
ICEBERG_RETURN_UNEXPECTED(adapter_->StartAppending());
}
+
+
ICEBERG_RETURN_UNEXPECTED(partition_summary_->Update(entry.data_file->partition));
+
+ // update statistics
+ switch (entry.status) {
+ case ManifestStatus::kAdded:
+ add_files_count_++;
+ add_rows_count_ += entry.data_file->record_count;
+ break;
+ case ManifestStatus::kExisting:
+ existing_files_count_++;
+ existing_rows_count_ += entry.data_file->record_count;
+ break;
+ case ManifestStatus::kDeleted:
+ delete_files_count_++;
+ delete_rows_count_ += entry.data_file->record_count;
+ break;
+ default:
+ std::unreachable();
+ }
+
+ if (entry.IsAlive() && entry.sequence_number.has_value()) {
+ if (!min_sequence_number_.has_value() ||
+ entry.sequence_number.value() < min_sequence_number_.value()) {
+ min_sequence_number_ = entry.sequence_number.value();
+ }
+ }
+
return adapter_->Append(entry);
}
+Status ManifestWriter::CheckDataFile(const DataFile& file) const {
+ switch (adapter_->content()) {
+ case ManifestContent::kData:
+ if (file.content != DataFile::Content::kData) {
+ return InvalidArgument("Cannot write {} file to data manifest file",
+ ToString(file.content));
+ }
+ break;
+ case ManifestContent::kDeletes:
+ if (file.content != DataFile::Content::kPositionDeletes &&
+ file.content != DataFile::Content::kEqualityDeletes) {
+ return InvalidArgument("Cannot write {} file to delete manifest file",
+ ToString(file.content));
+ }
+ break;
+ default:
+ std::unreachable();
+ }
+ return {};
+}
+
Status ManifestWriter::AddAll(const std::vector<ManifestEntry>& entries) {
for (const auto& entry : entries) {
- ICEBERG_RETURN_UNEXPECTED(Add(entry));
+ ICEBERG_RETURN_UNEXPECTED(WriteEntry(entry));
}
return {};
}
@@ -50,11 +208,45 @@ Status ManifestWriter::Close() {
ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending());
ICEBERG_RETURN_UNEXPECTED(writer_->Write(array));
}
- return writer_->Close();
+ ICEBERG_RETURN_UNEXPECTED(writer_->Close());
+ closed_ = true;
+ return {};
}
ManifestContent ManifestWriter::content() const { return adapter_->content(); }
+Result<Metrics> ManifestWriter::metrics() const { return writer_->metrics(); }
+
+Result<ManifestFile> ManifestWriter::ToManifestFile() const {
+ if (!closed_) [[unlikely]] {
+ return Invalid("Cannot get ManifestFile before closing the writer.");
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(auto partitions, partition_summary_->Summaries());
+ ICEBERG_ASSIGN_OR_RAISE(auto manifest_length, writer_->length());
+
+ return ManifestFile{
+ .manifest_path = manifest_location_,
+ .manifest_length = manifest_length,
+ .partition_spec_id = adapter_->partition_spec()->spec_id(),
+ .content = adapter_->content(),
+ // sequence_number and min_sequence_number with kInvalidSequenceNumber
will be
+ // replace with real sequence number in `ManifestListWriter`.
+ .sequence_number = TableMetadata::kInvalidSequenceNumber,
+ .min_sequence_number =
+ min_sequence_number_.value_or(TableMetadata::kInvalidSequenceNumber),
+ .added_snapshot_id =
adapter_->snapshot_id().value_or(Snapshot::kInvalidSnapshotId),
+ .added_files_count = add_files_count_,
+ .existing_files_count = existing_files_count_,
+ .deleted_files_count = delete_files_count_,
+ .added_rows_count = add_rows_count_,
+ .existing_rows_count = existing_rows_count_,
+ .deleted_rows_count = delete_rows_count_,
+ .partitions = std::move(partitions),
+ .first_row_id = first_row_id_,
+ };
+}
+
Result<std::unique_ptr<Writer>> OpenFileWriter(
std::string_view location, std::shared_ptr<Schema> schema,
std::shared_ptr<FileIO> file_io,
@@ -102,7 +294,8 @@ Result<std::unique_ptr<ManifestWriter>>
ManifestWriter::MakeV1Writer(
auto writer,
OpenFileWriter(manifest_location, std::move(schema), std::move(file_io),
adapter->metadata(), "manifest_entry"));
- return std::make_unique<ManifestWriter>(std::move(writer),
std::move(adapter));
+ return std::make_unique<ManifestWriter>(std::move(writer),
std::move(adapter),
+ manifest_location, std::nullopt);
}
Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV2Writer(
@@ -131,7 +324,8 @@ Result<std::unique_ptr<ManifestWriter>>
ManifestWriter::MakeV2Writer(
auto writer,
OpenFileWriter(manifest_location, std::move(schema), std::move(file_io),
adapter->metadata(), "manifest_entry"));
- return std::make_unique<ManifestWriter>(std::move(writer),
std::move(adapter));
+ return std::make_unique<ManifestWriter>(std::move(writer),
std::move(adapter),
+ manifest_location, std::nullopt);
}
Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV3Writer(
@@ -162,7 +356,8 @@ Result<std::unique_ptr<ManifestWriter>>
ManifestWriter::MakeV3Writer(
auto writer,
OpenFileWriter(manifest_location, std::move(schema), std::move(file_io),
adapter->metadata(), "manifest_entry"));
- return std::make_unique<ManifestWriter>(std::move(writer),
std::move(adapter));
+ return std::make_unique<ManifestWriter>(std::move(writer),
std::move(adapter),
+ manifest_location, first_row_id);
}
Status ManifestListWriter::Add(const ManifestFile& file) {
diff --git a/src/iceberg/manifest_writer.h b/src/iceberg/manifest_writer.h
index 37a2c5a..8889ae2 100644
--- a/src/iceberg/manifest_writer.h
+++ b/src/iceberg/manifest_writer.h
@@ -23,11 +23,14 @@
/// Data writer interface for manifest files and manifest list files.
#include <memory>
+#include <string>
#include <vector>
#include "iceberg/file_writer.h"
#include "iceberg/iceberg_export.h"
#include "iceberg/manifest_adapter.h"
+#include "iceberg/metrics.h"
+#include "iceberg/result.h"
#include "iceberg/type_fwd.h"
namespace iceberg {
@@ -36,18 +39,67 @@ namespace iceberg {
class ICEBERG_EXPORT ManifestWriter {
public:
ManifestWriter(std::unique_ptr<Writer> writer,
- std::unique_ptr<ManifestEntryAdapter> adapter)
- : writer_(std::move(writer)), adapter_(std::move(adapter)) {}
+ std::unique_ptr<ManifestEntryAdapter> adapter,
+ std::string_view manifest_location, std::optional<int64_t>
first_row_id);
- ~ManifestWriter() = default;
+ ~ManifestWriter();
- /// \brief Write manifest entry to file.
+ /// \brief Write the entry that all its fields are populated correctly.
/// \param entry Manifest entry to write.
/// \return Status::OK() if entry was written successfully
- Status Add(const ManifestEntry& entry);
+ /// \note All other write entry variants delegate to this method after
populating
+ /// the necessary fields.
+ Status WriteEntry(const ManifestEntry& entry);
+
+ /// \brief Add an added entry for a file with a specific sequence number.
+ ///
+ /// \param file an added data file
+ /// \param data_sequence_number a data sequence number for the file
+ /// \return Status::OK() if the entry was written successfully
+ /// \note The entry's snapshot ID will be this manifest's snapshot ID. The
entry's data
+ /// sequence number will be the provided data sequence number. The entry's
file sequence
+ /// number will be assigned at commit.
+ Status WriteAddedEntry(std::shared_ptr<DataFile> file,
+ std::optional<int64_t> data_sequence_number =
std::nullopt);
+ /// \brief Add a new entry to the manifest.
+ /// The method populates the snapshot id and status fields of the entry.
+ Status WriteAddedEntry(const ManifestEntry& entry);
+
+ /// \brief Add an existing entry for a file.
+ /// \param file an existing data file
+ /// \param file_snapshot_id snapshot ID when the data file was added to the
table
+ /// \param data_sequence_number a data sequence number of the file (assigned
when the
+ /// file was added)
+ /// \param file_sequence_number a file sequence number (assigned when the
file was
+ /// added)
+ /// \return Status::OK() if the entry was written successfully
+ /// \note The original data and file sequence numbers, snapshot ID, which
were assigned
+ /// at commit, must be preserved when adding an existing entry.
+ Status WriteExistingEntry(std::shared_ptr<DataFile> file, int64_t
file_snapshot_id,
+ int64_t data_sequence_number,
+ std::optional<int64_t> file_sequence_number =
std::nullopt);
+ /// \brief Add an existing entry to the manifest.
+ /// The method populates the status field of the entry.
+ Status WriteExistingEntry(const ManifestEntry& entry);
+
+ /// \brief Add a delete entry for a file.
+ /// \param file a deleted data file
+ /// \param data_sequence_number a data sequence number of the file (assigned
when the
+ /// file was added)
+ /// \param file_sequence_number a file sequence number (assigned when the
file was
+ /// added)
+ /// \return Status::OK() if the entry was written successfully
+ /// \note The entry's snapshot ID will be this manifest's snapshot ID.
However, the
+ /// original data and file sequence numbers of the file must be preserved
when the file
+ /// is marked as deleted.
+ Status WriteDeletedEntry(std::shared_ptr<DataFile> file, int64_t
data_sequence_number,
+ std::optional<int64_t> file_sequence_number =
std::nullopt);
+ /// \brief Add a deleted entry to the manifest.
+ /// The method populates the snapshot id and status fields of the entry.
+ Status WriteDeletedEntry(const ManifestEntry& entry);
/// \brief Write manifest entries to file.
- /// \param entries Manifest entries to write.
+ /// \param entries Already populated manifest entries to write.
/// \return Status::OK() if all entries were written successfully
Status AddAll(const std::vector<ManifestEntry>& entries);
@@ -57,6 +109,14 @@ class ICEBERG_EXPORT ManifestWriter {
/// \brief Get the content of the manifest.
ManifestContent content() const;
+ /// \brief Get the metrics of written manifest file.
+ /// \note Only valid after the file is closed.
+ Result<Metrics> metrics() const;
+
+ /// \brief Get the ManifestFile object.
+ /// \note Only valid after the file is closed.
+ Result<ManifestFile> ToManifestFile() const;
+
/// \brief Creates a writer for a manifest file.
/// \param snapshot_id ID of the snapshot.
/// \param manifest_location Path to the manifest file.
@@ -100,9 +160,23 @@ class ICEBERG_EXPORT ManifestWriter {
std::shared_ptr<Schema> current_schema, ManifestContent content);
private:
+ Status CheckDataFile(const DataFile& file) const;
+
static constexpr int64_t kBatchSize = 1024;
std::unique_ptr<Writer> writer_;
std::unique_ptr<ManifestEntryAdapter> adapter_;
+ bool closed_{false};
+ std::string manifest_location_;
+ std::optional<int64_t> first_row_id_;
+
+ int32_t add_files_count_{0};
+ int32_t existing_files_count_{0};
+ int32_t delete_files_count_{0};
+ int64_t add_rows_count_{0L};
+ int64_t existing_rows_count_{0L};
+ int64_t delete_rows_count_{0L};
+ std::optional<int64_t> min_sequence_number_{std::nullopt};
+ std::unique_ptr<PartitionSummary> partition_summary_;
};
/// \brief Write manifest files to a manifest list file.
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index 5669b22..2df94cf 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -64,6 +64,7 @@ iceberg_sources = files(
'name_mapping.cc',
'partition_field.cc',
'partition_spec.cc',
+ 'partition_summary.cc',
'row/arrow_array_wrapper.cc',
'row/manifest_wrapper.cc',
'row/struct_like.cc',
diff --git a/src/iceberg/metrics.h b/src/iceberg/metrics.h
index 1dfeb20..b476a47 100644
--- a/src/iceberg/metrics.h
+++ b/src/iceberg/metrics.h
@@ -22,6 +22,7 @@
/// \file iceberg/metrics.h
/// Iceberg file format metrics
+#include <optional>
#include <unordered_map>
#include "iceberg/expression/literal.h"
@@ -31,13 +32,13 @@ namespace iceberg {
/// \brief Iceberg file format metrics
struct ICEBERG_EXPORT Metrics {
- int64_t row_count = 0;
- std::unordered_map<int64_t, int64_t> column_sizes;
- std::unordered_map<int64_t, int64_t> value_counts;
- std::unordered_map<int64_t, int64_t> null_value_counts;
- std::unordered_map<int64_t, int64_t> nan_value_counts;
- std::unordered_map<int64_t, Literal> lower_bounds;
- std::unordered_map<int64_t, Literal> upper_bounds;
+ std::optional<int64_t> row_count;
+ std::unordered_map<int32_t, int64_t> column_sizes;
+ std::unordered_map<int32_t, int64_t> value_counts;
+ std::unordered_map<int32_t, int64_t> null_value_counts;
+ std::unordered_map<int32_t, int64_t> nan_value_counts;
+ std::unordered_map<int32_t, Literal> lower_bounds;
+ std::unordered_map<int32_t, Literal> upper_bounds;
};
} // namespace iceberg
diff --git a/src/iceberg/parquet/parquet_writer.cc
b/src/iceberg/parquet/parquet_writer.cc
index 9c8f081..44e6133 100644
--- a/src/iceberg/parquet/parquet_writer.cc
+++ b/src/iceberg/parquet/parquet_writer.cc
@@ -137,16 +137,16 @@ Status ParquetWriter::Write(ArrowArray* array) { return
impl_->Write(array); }
Status ParquetWriter::Close() { return impl_->Close(); }
-std::optional<Metrics> ParquetWriter::metrics() {
+Result<Metrics> ParquetWriter::metrics() {
if (!impl_->Closed()) {
- return std::nullopt;
+ return Invalid("ParquetWriter is not closed");
}
return {};
}
-std::optional<int64_t> ParquetWriter::length() {
+Result<int64_t> ParquetWriter::length() {
if (!impl_->Closed()) {
- return std::nullopt;
+ return Invalid("ParquetWriter is not closed");
}
return impl_->length();
}
diff --git a/src/iceberg/parquet/parquet_writer.h
b/src/iceberg/parquet/parquet_writer.h
index 9be0a43..5abe45d 100644
--- a/src/iceberg/parquet/parquet_writer.h
+++ b/src/iceberg/parquet/parquet_writer.h
@@ -37,9 +37,9 @@ class ICEBERG_BUNDLE_EXPORT ParquetWriter : public Writer {
Status Write(ArrowArray* array) final;
- std::optional<Metrics> metrics() final;
+ Result<Metrics> metrics() final;
- std::optional<int64_t> length() final;
+ Result<int64_t> length() final;
std::vector<int64_t> split_offsets() final;
diff --git a/src/iceberg/partition_summary.cc b/src/iceberg/partition_summary.cc
new file mode 100644
index 0000000..cf279fd
--- /dev/null
+++ b/src/iceberg/partition_summary.cc
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <memory>
+
+#include "iceberg/expression/literal.h"
+#include "iceberg/manifest_list.h"
+#include "iceberg/partition_summary_internal.h"
+#include "iceberg/result.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/formatter.h" // IWYU pragma: keep
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+Status PartitionFieldStats::Update(const Literal& value) {
+ if (type_->type_id() != value.type()->type_id()) [[unlikely]] {
+ return InvalidArgument("value type {} is not compatible with expected type
{}.",
+ *value.type(), *type_);
+ }
+
+ if (value.IsNull()) {
+ contains_null_ = true;
+ return {};
+ }
+
+ if (value.IsNaN()) {
+ contains_nan_ = true;
+ return {};
+ }
+
+ if (!lower_bound_ || value < *lower_bound_) {
+ lower_bound_ = value;
+ }
+ if (!upper_bound_ || value > *upper_bound_) {
+ upper_bound_ = value;
+ }
+ return {};
+}
+
+Result<PartitionFieldSummary> PartitionFieldStats::Finish() const {
+ PartitionFieldSummary summary;
+ summary.contains_null = contains_null_;
+ summary.contains_nan = contains_nan_;
+ if (lower_bound_) {
+ ICEBERG_ASSIGN_OR_RAISE(summary.lower_bound, lower_bound_->Serialize());
+ }
+ if (upper_bound_) {
+ ICEBERG_ASSIGN_OR_RAISE(summary.upper_bound, upper_bound_->Serialize());
+ }
+ return summary;
+}
+
+PartitionSummary::PartitionSummary(const StructType& partition_type) {
+ field_stats_.reserve(partition_type.fields().size());
+ for (const auto& field : partition_type.fields()) {
+ field_stats_.emplace_back(field.type());
+ }
+}
+
+Status PartitionSummary::Update(const std::vector<Literal>& partition_values) {
+ if (partition_values.size() != field_stats_.size()) [[unlikely]] {
+ return InvalidArgument("partition values size {} does not match field
stats size {}",
+ partition_values.size(), field_stats_.size());
+ }
+
+ for (size_t i = 0; i < partition_values.size(); i++) {
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto literal,
+ partition_values[i].CastTo(
+
internal::checked_pointer_cast<PrimitiveType>(field_stats_[i].type())));
+ ICEBERG_RETURN_UNEXPECTED(field_stats_[i].Update(literal));
+ }
+ return {};
+}
+
+Result<std::vector<PartitionFieldSummary>> PartitionSummary::Summaries() const
{
+ std::vector<PartitionFieldSummary> summaries;
+ summaries.reserve(field_stats_.size());
+ for (const auto& field_stat : field_stats_) {
+ ICEBERG_ASSIGN_OR_RAISE(auto summary, field_stat.Finish());
+ summaries.push_back(std::move(summary));
+ }
+ return summaries;
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/partition_summary_internal.h
b/src/iceberg/partition_summary_internal.h
new file mode 100644
index 0000000..167d1f0
--- /dev/null
+++ b/src/iceberg/partition_summary_internal.h
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include "iceberg/expression/literal.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+
+namespace iceberg {
+
+/// \brief Statistics for a partition field.
+class PartitionFieldStats {
+ public:
+ explicit PartitionFieldStats(const std::shared_ptr<Type>& type) :
type_(type) {}
+
+ /// \brief Update the partition field stats with a new partition field value.
+ Status Update(const Literal& value);
+
+ /// \brief Finish the partition field stats and produce the partition field
summary.
+ Result<PartitionFieldSummary> Finish() const;
+
+ const std::shared_ptr<Type>& type() const { return type_; }
+
+ private:
+ const std::shared_ptr<Type>& type_{nullptr};
+ bool contains_null_{false};
+ bool contains_nan_{false};
+ std::optional<Literal> lower_bound_;
+ std::optional<Literal> upper_bound_;
+};
+
+/// \brief Maintains statistics for each partition field and produces the
partition field
+/// summaries.
+class PartitionSummary {
+ public:
+ /// \brief Create a PartitionSummary with the given field stats.
+ explicit PartitionSummary(std::vector<PartitionFieldStats> field_stats)
+ : field_stats_(std::move(field_stats)) {}
+
+ /// \brief Create a PartitionSummary for the given partition type.
+ explicit PartitionSummary(const StructType& partition_type);
+
+ /// \brief Update the partition summary with partition values.
+ Status Update(const std::vector<Literal>& partition_values);
+
+ /// \brief Get the list of partition field summaries.
+ Result<std::vector<PartitionFieldSummary>> Summaries() const;
+
+ private:
+ std::vector<PartitionFieldStats> field_stats_;
+};
+
+} // namespace iceberg
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index 87a11c3..0efc4e1 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -124,6 +124,7 @@ if(ICEBERG_BUILD_BUNDLE)
manifest_list_reader_writer_test.cc
manifest_list_versions_test.cc
manifest_reader_writer_test.cc
+ manifest_writer_versions_test.cc
test_common.cc)
add_iceberg_test(arrow_test
diff --git a/src/iceberg/test/manifest_list_reader_writer_test.cc
b/src/iceberg/test/manifest_list_reader_writer_test.cc
index 826786b..a6985f7 100644
--- a/src/iceberg/test/manifest_list_reader_writer_test.cc
+++ b/src/iceberg/test/manifest_list_reader_writer_test.cc
@@ -64,7 +64,7 @@ class ManifestListReaderWriterTestBase : public
TempFileTestBase {
for (const auto& manifest : manifest_files) {
ASSERT_EQ(manifest.partition_spec_id, 0);
ASSERT_TRUE(manifest.partitions.empty());
- ASSERT_EQ(manifest.content, ManifestFile::Content::kData);
+ ASSERT_EQ(manifest.content, ManifestContent::kData);
}
}
@@ -242,7 +242,7 @@ class ManifestListReaderWriterV2Test : public
ManifestListReaderWriterTestBase {
manifest_file.manifest_path = test_dir_prefix + paths[i];
manifest_file.manifest_length = file_size[i];
manifest_file.partition_spec_id = 0;
- manifest_file.content = ManifestFile::Content::kData;
+ manifest_file.content = ManifestContent::kData;
manifest_file.sequence_number = 4 - i;
manifest_file.min_sequence_number = 4 - i;
manifest_file.added_snapshot_id = snapshot_id[i];
@@ -282,7 +282,7 @@ class ManifestListReaderWriterV2Test : public
ManifestListReaderWriterTestBase {
manifest_file.manifest_path = test_dir_prefix + paths[i];
manifest_file.manifest_length = file_size[i];
manifest_file.partition_spec_id = 0;
- manifest_file.content = ManifestFile::Content::kData;
+ manifest_file.content = ManifestContent::kData;
manifest_file.sequence_number = 4 - i;
manifest_file.min_sequence_number = 4 - i;
manifest_file.added_snapshot_id = snapshot_id[i];
diff --git a/src/iceberg/test/manifest_list_versions_test.cc
b/src/iceberg/test/manifest_list_versions_test.cc
index 1f329ba..bf83e8c 100644
--- a/src/iceberg/test/manifest_list_versions_test.cc
+++ b/src/iceberg/test/manifest_list_versions_test.cc
@@ -59,7 +59,7 @@ const static auto kTestManifest = ManifestFile{
.manifest_path = kPath,
.manifest_length = kLength,
.partition_spec_id = kSpecId,
- .content = ManifestFile::Content::kData,
+ .content = ManifestContent::kData,
.sequence_number = kSeqNum,
.min_sequence_number = kMinSeqNum,
.added_snapshot_id = kSnapshotId,
@@ -78,7 +78,7 @@ const static auto kDeleteManifest = ManifestFile{
.manifest_path = kPath,
.manifest_length = kLength,
.partition_spec_id = kSpecId,
- .content = ManifestFile::Content::kDeletes,
+ .content = ManifestContent::kDeletes,
.sequence_number = kSeqNum,
.min_sequence_number = kMinSeqNum,
.added_snapshot_id = kSnapshotId,
@@ -227,7 +227,7 @@ TEST_F(TestManifestListVersions, TestV1Write) {
EXPECT_EQ(manifest.manifest_path, kPath);
EXPECT_EQ(manifest.manifest_length, kLength);
EXPECT_EQ(manifest.partition_spec_id, kSpecId);
- EXPECT_EQ(manifest.content, ManifestFile::Content::kData);
+ EXPECT_EQ(manifest.content, ManifestContent::kData);
EXPECT_EQ(manifest.added_snapshot_id, kSnapshotId);
EXPECT_EQ(manifest.added_files_count, kAddedFiles);
EXPECT_EQ(manifest.existing_files_count, kExistingFiles);
@@ -247,7 +247,7 @@ TEST_F(TestManifestListVersions, TestV2Write) {
EXPECT_EQ(manifest.manifest_path, kPath);
EXPECT_EQ(manifest.manifest_length, kLength);
EXPECT_EQ(manifest.partition_spec_id, kSpecId);
- EXPECT_EQ(manifest.content, ManifestFile::Content::kData);
+ EXPECT_EQ(manifest.content, ManifestContent::kData);
EXPECT_EQ(manifest.sequence_number, kSeqNum);
EXPECT_EQ(manifest.min_sequence_number, kMinSeqNum);
EXPECT_EQ(manifest.added_snapshot_id, kSnapshotId);
@@ -266,7 +266,7 @@ TEST_F(TestManifestListVersions, TestV3Write) {
EXPECT_EQ(manifest.manifest_path, kPath);
EXPECT_EQ(manifest.manifest_length, kLength);
EXPECT_EQ(manifest.partition_spec_id, kSpecId);
- EXPECT_EQ(manifest.content, ManifestFile::Content::kData);
+ EXPECT_EQ(manifest.content, ManifestContent::kData);
EXPECT_EQ(manifest.sequence_number, kSeqNum);
EXPECT_EQ(manifest.min_sequence_number, kMinSeqNum);
EXPECT_EQ(manifest.added_snapshot_id, kSnapshotId);
@@ -292,7 +292,7 @@ TEST_F(TestManifestListVersions,
TestV3WriteFirstRowIdAssignment) {
EXPECT_EQ(manifest.manifest_path, kPath);
EXPECT_EQ(manifest.manifest_length, kLength);
EXPECT_EQ(manifest.partition_spec_id, kSpecId);
- EXPECT_EQ(manifest.content, ManifestFile::Content::kData);
+ EXPECT_EQ(manifest.content, ManifestContent::kData);
EXPECT_EQ(manifest.sequence_number, kSeqNum);
EXPECT_EQ(manifest.min_sequence_number, kMinSeqNum);
EXPECT_EQ(manifest.added_snapshot_id, kSnapshotId);
@@ -323,7 +323,7 @@ TEST_F(TestManifestListVersions,
TestV3WriteMixedRowIdAssignment) {
EXPECT_EQ(manifest.manifest_path, kPath);
EXPECT_EQ(manifest.manifest_length, kLength);
EXPECT_EQ(manifest.partition_spec_id, kSpecId);
- EXPECT_EQ(manifest.content, ManifestFile::Content::kData);
+ EXPECT_EQ(manifest.content, ManifestContent::kData);
EXPECT_EQ(manifest.sequence_number, kSeqNum);
EXPECT_EQ(manifest.min_sequence_number, kMinSeqNum);
EXPECT_EQ(manifest.added_snapshot_id, kSnapshotId);
@@ -442,7 +442,7 @@ TEST_F(TestManifestListVersions,
TestManifestsPartitionSummary) {
.manifest_path = kPath,
.manifest_length = kLength,
.partition_spec_id = kSpecId,
- .content = ManifestFile::Content::kData,
+ .content = ManifestContent::kData,
.sequence_number = kSeqNum,
.min_sequence_number = kMinSeqNum,
.added_snapshot_id = kSnapshotId,
diff --git a/src/iceberg/test/manifest_reader_writer_test.cc
b/src/iceberg/test/manifest_reader_writer_test.cc
index 54dcc75..c05b64e 100644
--- a/src/iceberg/test/manifest_reader_writer_test.cc
+++ b/src/iceberg/test/manifest_reader_writer_test.cc
@@ -50,14 +50,16 @@ class ManifestReaderWriterTestBase : public
TempFileTestBase {
void TestManifestReading(const std::string& resource_name,
const std::vector<ManifestEntry>& expected_entries,
- std::shared_ptr<Schema> partition_schema = nullptr)
{
+ std::shared_ptr<Schema> partition_schema = nullptr,
+ std::optional<int64_t> snapshot_id = std::nullopt) {
std::string path = GetResourcePath(resource_name);
- TestManifestReadingByPath(path, expected_entries, partition_schema);
+ TestManifestReadingByPath(path, expected_entries, partition_schema,
snapshot_id);
}
void TestManifestReadingByPath(const std::string& path,
const std::vector<ManifestEntry>&
expected_entries,
- std::shared_ptr<Schema> partition_schema =
nullptr) {
+ std::shared_ptr<Schema> partition_schema =
nullptr,
+ std::optional<int64_t> snapshot_id =
std::nullopt) {
auto manifest_reader_result = ManifestReader::Make(path, file_io_,
partition_schema);
ASSERT_TRUE(manifest_reader_result.has_value())
<< manifest_reader_result.error().message;
@@ -152,12 +154,12 @@ class ManifestV1Test : public
ManifestReaderWriterTestBase {
return manifest_entries;
}
- void TestWriteManifest(const std::string& manifest_list_path,
+ void TestWriteManifest(int64_t snapshot_id, const std::string&
manifest_list_path,
std::shared_ptr<PartitionSpec> partition_spec,
const std::vector<ManifestEntry>& manifest_entries,
std::shared_ptr<Schema> table_schema) {
auto result =
- ManifestWriter::MakeV1Writer(1, manifest_list_path, file_io_,
+ ManifestWriter::MakeV1Writer(snapshot_id, manifest_list_path, file_io_,
std::move(partition_spec),
std::move(table_schema));
ASSERT_TRUE(result.has_value()) << result.error().message;
auto writer = std::move(result.value());
@@ -192,8 +194,9 @@ TEST_F(ManifestV1Test, WritePartitionedTest) {
auto expected_entries = PreparePartitionedTestData();
auto write_manifest_path = CreateNewTempFilePath();
- TestWriteManifest(write_manifest_path, partition_spec, expected_entries,
table_schema);
- TestManifestReadingByPath(write_manifest_path, expected_entries,
partition_schema);
+ TestWriteManifest(1, write_manifest_path, partition_spec, expected_entries,
+ table_schema);
+ TestManifestReadingByPath(write_manifest_path, expected_entries,
partition_schema, 1);
}
class ManifestV2Test : public ManifestReaderWriterTestBase {
@@ -289,7 +292,7 @@ TEST_F(ManifestV2Test, ReadMetadataInheritanceTest) {
.manifest_path = path,
.manifest_length = 100,
.partition_spec_id = 12,
- .content = ManifestFile::Content::kData,
+ .content = ManifestContent::kData,
.sequence_number = 15,
.added_snapshot_id = 679879563479918846LL,
};
@@ -320,7 +323,7 @@ TEST_F(ManifestV2Test, WriteInheritancePartitionedTest) {
.manifest_path = write_manifest_path,
.manifest_length = 100,
.partition_spec_id = 12,
- .content = ManifestFile::Content::kData,
+ .content = ManifestContent::kData,
.sequence_number = 15,
.added_snapshot_id = 679879563479918846LL,
};
diff --git a/src/iceberg/test/manifest_writer_versions_test.cc
b/src/iceberg/test/manifest_writer_versions_test.cc
new file mode 100644
index 0000000..b792c86
--- /dev/null
+++ b/src/iceberg/test/manifest_writer_versions_test.cc
@@ -0,0 +1,668 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdint>
+#include <memory>
+#include <optional>
+#include <string>
+#include <unordered_map>
+
+#include <gtest/gtest.h>
+
+#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/avro/avro_register.h"
+#include "iceberg/expression/literal.h"
+#include "iceberg/file_format.h"
+#include "iceberg/manifest_entry.h"
+#include "iceberg/manifest_list.h"
+#include "iceberg/manifest_reader.h"
+#include "iceberg/manifest_writer.h"
+#include "iceberg/metrics.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_field.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/transform.h"
+#include "iceberg/type.h"
+
+namespace iceberg {
+
+namespace {
+
+constexpr int64_t kSequenceNumber = 34L;
+constexpr int64_t kSnapshotId = 987134631982734L;
+constexpr std::string_view kPath =
+
"s3://bucket/table/category=cheesy/timestamp_hour=10/id_bucket=3/file.avro";
+constexpr FileFormatType kFormat = FileFormatType::kAvro;
+constexpr int32_t kSortOrderId = 2;
+constexpr int64_t kFirstRowId = 100L;
+
+const std::vector<Literal> kPartition = {Literal::String("cheesy"),
Literal::Int(10),
+ Literal::Int(3)};
+const std::vector<int32_t> kEqualityIds = {1};
+
+const auto kMetrics = Metrics{
+ .row_count = 1587L,
+ .column_sizes = {{1, 15L}, {2, 122L}, {3, 4021L}, {4, 9411L}, {5, 15L}},
+ .value_counts = {{1, 100L}, {2, 100L}, {3, 100L}, {4, 100L}, {5, 100L}},
+ .null_value_counts = {{1, 0L}, {2, 0L}, {3, 0L}, {4, 0L}, {5, 0L}},
+ .nan_value_counts = {{5, 10L}},
+ .lower_bounds = {{1, Literal::Int(1)}},
+ .upper_bounds = {{1, Literal::Int(1)}},
+};
+
+const auto kOffsets = std::vector<int64_t>{4L};
+
+std::unique_ptr<DataFile> CreateDataFile(
+ std::optional<int64_t> first_row_id = std::nullopt) {
+ auto data_file = std::make_unique<DataFile>();
+ data_file->file_path = std::string(kPath);
+ data_file->file_format = kFormat;
+ data_file->partition = kPartition;
+ data_file->file_size_in_bytes = 150972L;
+ data_file->record_count = kMetrics.row_count.value();
+ data_file->split_offsets = kOffsets;
+ data_file->sort_order_id = kSortOrderId;
+ data_file->first_row_id = first_row_id;
+ data_file->column_sizes = {kMetrics.column_sizes.begin(),
kMetrics.column_sizes.end()};
+ data_file->value_counts = {kMetrics.value_counts.begin(),
kMetrics.value_counts.end()};
+ data_file->null_value_counts = {kMetrics.null_value_counts.begin(),
+ kMetrics.null_value_counts.end()};
+ data_file->nan_value_counts = {kMetrics.nan_value_counts.begin(),
+ kMetrics.nan_value_counts.end()};
+
+ for (const auto& [col_id, bound] : kMetrics.lower_bounds) {
+ data_file->lower_bounds[col_id] = bound.Serialize().value();
+ }
+ for (const auto& [col_id, bound] : kMetrics.upper_bounds) {
+ data_file->upper_bounds[col_id] = bound.Serialize().value();
+ }
+
+ return data_file;
+}
+
+std::unique_ptr<DataFile> CreateDeleteFile() {
+ auto delete_file = std::make_unique<DataFile>();
+ delete_file->content = DataFile::Content::kEqualityDeletes;
+ delete_file->file_path = std::string(kPath);
+ delete_file->file_format = kFormat;
+ delete_file->partition = kPartition;
+ delete_file->file_size_in_bytes = 22905L;
+ delete_file->equality_ids = kEqualityIds;
+
+ delete_file->column_sizes = {kMetrics.column_sizes.begin(),
+ kMetrics.column_sizes.end()};
+ delete_file->value_counts = {kMetrics.value_counts.begin(),
+ kMetrics.value_counts.end()};
+ delete_file->null_value_counts = {kMetrics.null_value_counts.begin(),
+ kMetrics.null_value_counts.end()};
+ delete_file->nan_value_counts = {kMetrics.nan_value_counts.begin(),
+ kMetrics.nan_value_counts.end()};
+
+ for (const auto& [col_id, bound] : kMetrics.lower_bounds) {
+ delete_file->lower_bounds[col_id] = bound.Serialize().value();
+ }
+ for (const auto& [col_id, bound] : kMetrics.upper_bounds) {
+ delete_file->upper_bounds[col_id] = bound.Serialize().value();
+ }
+
+ return delete_file;
+}
+
+} // namespace
+
+class ManifestWriterVersionsTest : public ::testing::Test {
+ protected:
+ void SetUp() override {
+ avro::RegisterAll();
+ schema_ = std::make_shared<Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "id", int64()),
+ SchemaField::MakeRequired(2, "timestamp", timestamp_tz()),
+ SchemaField::MakeRequired(3, "category", string()),
+ SchemaField::MakeRequired(4, "data", string()),
+ SchemaField::MakeRequired(5, "double", float64())});
+ spec_ = PartitionSpec::Make(
+ 0, {PartitionField(3, 1000, "category", Transform::Identity()),
+ PartitionField(2, 1001, "timestamp_hour",
Transform::Hour()),
+ PartitionField(1, 1002, "id_bucket",
Transform::Bucket(16))})
+ .value();
+
+ data_file_ = CreateDataFile(kFirstRowId);
+ delete_file_ = CreateDeleteFile();
+ data_file_without_first_row_id_ = CreateDataFile();
+
+ file_io_ = iceberg::arrow::MakeMockFileIO();
+ }
+
+ static std::string CreateManifestListPath() {
+ return std::format("manifest-list-{}.avro",
+
std::chrono::system_clock::now().time_since_epoch().count());
+ }
+
+ std::string WriteManifests(int format_version,
+ const std::vector<ManifestFile>& manifests) const
{
+ const std::string manifest_list_path = CreateManifestListPath();
+ constexpr int64_t kParentSnapshotId = kSnapshotId - 1;
+
+ Result<std::unique_ptr<ManifestListWriter>> writer_result =
+ NotSupported("Format version: {}", format_version);
+
+ if (format_version == 1) {
+ writer_result = ManifestListWriter::MakeV1Writer(kSnapshotId,
kParentSnapshotId,
+ manifest_list_path,
file_io_);
+ } else if (format_version == 2) {
+ writer_result = ManifestListWriter::MakeV2Writer(
+ kSnapshotId, kParentSnapshotId, kSequenceNumber, manifest_list_path,
file_io_);
+ } else if (format_version == 3) {
+ writer_result = ManifestListWriter::MakeV3Writer(kSnapshotId,
kParentSnapshotId,
+ kSequenceNumber,
kFirstRowId,
+ manifest_list_path,
file_io_);
+ }
+
+ EXPECT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ EXPECT_THAT(writer->AddAll(manifests), IsOk());
+ EXPECT_THAT(writer->Close(), IsOk());
+
+ return manifest_list_path;
+ }
+
+ std::vector<ManifestFile> WriteAndReadManifests(
+ const std::vector<ManifestFile>& manifests, int format_version) const {
+ return ReadManifests(WriteManifests(format_version, manifests));
+ }
+
+ std::vector<ManifestFile> ReadManifests(const std::string&
manifest_list_path) const {
+ auto reader_result = ManifestListReader::Make(manifest_list_path,
file_io_);
+ EXPECT_THAT(reader_result, IsOk());
+
+ auto reader = std::move(reader_result.value());
+ auto files_result = reader->Files();
+ EXPECT_THAT(files_result, IsOk());
+
+ return files_result.value();
+ }
+
+ static std::string CreateManifestPath() {
+ return std::format("manifest-{}.avro",
+
std::chrono::system_clock::now().time_since_epoch().count());
+ }
+
+ ManifestFile WriteManifest(int format_version,
+ std::vector<std::shared_ptr<DataFile>>
data_files) {
+ const std::string manifest_path = CreateManifestPath();
+
+ Result<std::unique_ptr<ManifestWriter>> writer_result =
+ NotSupported("Format version: {}", format_version);
+
+ if (format_version == 1) {
+ writer_result = ManifestWriter::MakeV1Writer(kSnapshotId, manifest_path,
file_io_,
+ spec_, schema_);
+ } else if (format_version == 2) {
+ writer_result = ManifestWriter::MakeV2Writer(
+ kSnapshotId, manifest_path, file_io_, spec_, schema_,
ManifestContent::kData);
+ } else if (format_version == 3) {
+ writer_result =
+ ManifestWriter::MakeV3Writer(kSnapshotId, kFirstRowId,
manifest_path, file_io_,
+ spec_, schema_, ManifestContent::kData);
+ }
+
+ EXPECT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ for (auto& data_file : data_files) {
+ EXPECT_THAT(writer->WriteAddedEntry(data_file), IsOk());
+ }
+
+ EXPECT_THAT(writer->Close(), IsOk());
+
+ auto manifest_result = writer->ToManifestFile();
+ EXPECT_THAT(manifest_result, IsOk());
+
+ return std::move(manifest_result.value());
+ }
+
+ std::vector<ManifestEntry> ReadManifest(const ManifestFile& manifest_file) {
+ auto partition_schema_result = spec_->PartitionType(*schema_);
+ EXPECT_THAT(partition_schema_result, IsOk());
+ std::shared_ptr<StructType> partition_type =
+ std::move(partition_schema_result.value());
+ auto reader_result = ManifestReader::Make(manifest_file, file_io_,
partition_type);
+ EXPECT_THAT(reader_result, IsOk());
+
+ auto reader = std::move(reader_result.value());
+ auto entries_result = reader->Entries();
+ EXPECT_THAT(entries_result, IsOk());
+
+ return entries_result.value();
+ }
+
+ ManifestFile WriteDeleteManifest(int format_version,
+ std::shared_ptr<DataFile> delete_file) {
+ const std::string manifest_path = CreateManifestPath();
+
+ Result<std::unique_ptr<ManifestWriter>> writer_result =
+ NotSupported("Format version: {}", format_version);
+
+ if (format_version == 2) {
+ writer_result =
+ ManifestWriter::MakeV2Writer(kSnapshotId, manifest_path, file_io_,
spec_,
+ schema_, ManifestContent::kDeletes);
+ } else if (format_version == 3) {
+ writer_result =
+ ManifestWriter::MakeV3Writer(kSnapshotId, kFirstRowId,
manifest_path, file_io_,
+ spec_, schema_,
ManifestContent::kDeletes);
+ }
+
+ EXPECT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ EXPECT_THAT(writer->WriteDeletedEntry(delete_file, kSequenceNumber,
std::nullopt),
+ IsOk());
+ EXPECT_THAT(writer->Close(), IsOk());
+
+ auto manifest_result = writer->ToManifestFile();
+ EXPECT_THAT(manifest_result, IsOk());
+
+ return std::move(manifest_result.value());
+ }
+
+ ManifestFile RewriteManifest(const ManifestFile& old_manifest, int
format_version) {
+ auto entries = ReadManifest(old_manifest);
+
+ const std::string manifest_path = CreateManifestPath();
+
+ Result<std::unique_ptr<ManifestWriter>> writer_result =
+ NotSupported("Format version: {}", format_version);
+
+ if (format_version == 1) {
+ writer_result = ManifestWriter::MakeV1Writer(kSnapshotId, manifest_path,
file_io_,
+ spec_, schema_);
+ } else if (format_version == 2) {
+ writer_result = ManifestWriter::MakeV2Writer(kSnapshotId, manifest_path,
file_io_,
+ spec_, schema_,
old_manifest.content);
+ } else if (format_version == 3) {
+ writer_result =
+ ManifestWriter::MakeV3Writer(kSnapshotId, kFirstRowId,
manifest_path, file_io_,
+ spec_, schema_, old_manifest.content);
+ }
+
+ EXPECT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ for (auto& entry : entries) {
+ EXPECT_THAT(
+ writer->WriteExistingEntry(
+ entry.data_file, entry.snapshot_id.value_or(kSnapshotId),
+
entry.sequence_number.value_or(TableMetadata::kInitialSequenceNumber),
+ entry.file_sequence_number),
+ IsOk());
+ }
+ EXPECT_THAT(writer->Close(), IsOk());
+
+ auto manifest_result = writer->ToManifestFile();
+ EXPECT_THAT(manifest_result, IsOk());
+
+ return std::move(manifest_result.value());
+ }
+
+ void CheckManifest(const ManifestFile& manifest, int64_t
expected_sequence_number,
+ int64_t expected_min_sequence_number) {
+ ASSERT_EQ(manifest.added_snapshot_id, kSnapshotId);
+ ASSERT_EQ(manifest.sequence_number, expected_sequence_number);
+ ASSERT_EQ(manifest.min_sequence_number, expected_min_sequence_number);
+ switch (manifest.content) {
+ case ManifestContent::kData:
+ ASSERT_EQ(manifest.added_files_count, 1L);
+ ASSERT_EQ(manifest.deleted_files_count, 0L);
+ ASSERT_EQ(manifest.added_rows_count, kMetrics.row_count);
+ break;
+ case ManifestContent::kDeletes:
+ ASSERT_EQ(manifest.added_files_count, 0L);
+ ASSERT_EQ(manifest.deleted_files_count, 1L);
+ ASSERT_EQ(manifest.added_rows_count, 0L);
+ break;
+ default:
+ std::unreachable();
+ }
+
+ ASSERT_EQ(manifest.existing_files_count, 0L);
+ ASSERT_EQ(manifest.existing_rows_count, 0L);
+ ASSERT_EQ(manifest.deleted_rows_count, 0L);
+ }
+
+ void CheckDataFile(const DataFile& data_file, DataFile::Content
expected_content,
+ std::optional<int64_t> expected_first_row_id =
std::nullopt) {
+ ASSERT_EQ(data_file.content, expected_content);
+ ASSERT_EQ(data_file.file_path, kPath);
+ ASSERT_EQ(data_file.file_format, kFormat);
+ ASSERT_EQ(data_file.partition, kPartition);
+ ASSERT_EQ(data_file.record_count, kMetrics.row_count);
+ ASSERT_EQ(data_file.sort_order_id, kSortOrderId);
+ switch (data_file.content) {
+ case DataFile::Content::kData:
+ ASSERT_EQ(data_file.first_row_id, expected_first_row_id);
+ ASSERT_TRUE(data_file.equality_ids.empty());
+ break;
+ case DataFile::Content::kEqualityDeletes:
+ ASSERT_EQ(data_file.first_row_id, std::nullopt);
+ ASSERT_EQ(data_file.equality_ids, kEqualityIds);
+ break;
+ case DataFile::Content::kPositionDeletes:
+ ASSERT_EQ(data_file.first_row_id, std::nullopt);
+ ASSERT_TRUE(data_file.equality_ids.empty());
+ break;
+ default:
+ std::unreachable();
+ }
+
+ // Metrics
+ ASSERT_EQ(data_file.column_sizes,
+ (std::map<int32_t, int64_t>(kMetrics.column_sizes.begin(),
+ kMetrics.column_sizes.end())));
+ ASSERT_EQ(data_file.value_counts,
+ (std::map<int32_t, int64_t>(kMetrics.value_counts.begin(),
+ kMetrics.value_counts.end())));
+ ASSERT_EQ(data_file.null_value_counts,
+ (std::map<int32_t, int64_t>(kMetrics.null_value_counts.begin(),
+ kMetrics.null_value_counts.end())));
+ ASSERT_EQ(data_file.nan_value_counts,
+ (std::map<int32_t, int64_t>(kMetrics.nan_value_counts.begin(),
+ kMetrics.nan_value_counts.end())));
+ ASSERT_EQ(data_file.lower_bounds.size(), kMetrics.lower_bounds.size());
+ for (const auto& [col_id, bound] : kMetrics.lower_bounds) {
+ auto it = data_file.lower_bounds.find(col_id);
+ ASSERT_NE(it, data_file.lower_bounds.end());
+ std::vector<uint8_t> serialized_bound = bound.Serialize().value();
+ ASSERT_EQ(it->second, serialized_bound);
+ }
+
+ ASSERT_EQ(data_file.upper_bounds.size(), kMetrics.upper_bounds.size());
+ for (const auto& [col_id, bound] : kMetrics.upper_bounds) {
+ auto it = data_file.upper_bounds.find(col_id);
+ ASSERT_NE(it, data_file.upper_bounds.end());
+ std::vector<uint8_t> serialized_bound = bound.Serialize().value();
+ ASSERT_EQ(it->second, serialized_bound);
+ }
+ }
+
+ void CheckEntry(const ManifestEntry& entry,
+ std::optional<int64_t> expected_data_sequence_number,
+ std::optional<int64_t> expected_file_sequence_number,
+ DataFile::Content expected_content,
+ ManifestStatus expected_status = ManifestStatus::kAdded,
+ std::optional<int64_t> expected_first_row_id = std::nullopt)
{
+ ASSERT_EQ(entry.status, expected_status);
+ ASSERT_EQ(entry.snapshot_id, kSnapshotId);
+ ASSERT_EQ(entry.sequence_number, expected_data_sequence_number);
+ ASSERT_EQ(entry.file_sequence_number, expected_file_sequence_number);
+ if (entry.status == ManifestStatus::kAdded) {
+ CheckDataFile(*entry.data_file, expected_content, expected_first_row_id);
+ }
+ }
+
+ void CheckRewrittenManifest(const ManifestFile& manifest,
+ int64_t expected_sequence_number,
+ int64_t expected_min_sequence_number) {
+ ASSERT_EQ(manifest.added_snapshot_id, kSnapshotId);
+ ASSERT_EQ(manifest.sequence_number, expected_sequence_number);
+ // TODO(zhjwpku): figure out why min_sequence_number check fails
+ // ASSERT_EQ(manifest.min_sequence_number, expected_min_sequence_number);
+ ASSERT_EQ(manifest.added_files_count, 0L);
+ ASSERT_EQ(manifest.existing_files_count, 1L);
+ ASSERT_EQ(manifest.deleted_files_count, 0L);
+ ASSERT_EQ(manifest.added_rows_count, 0L);
+ ASSERT_EQ(manifest.existing_rows_count, kMetrics.row_count);
+ ASSERT_EQ(manifest.deleted_rows_count, 0L);
+ }
+
+ void CheckRewrittenEntry(const ManifestEntry& entry, int64_t
expected_sequence_number,
+ DataFile::Content expected_content,
+ std::optional<int64_t> expected_first_row_id =
std::nullopt) {
+ ASSERT_EQ(entry.status, ManifestStatus::kExisting);
+ ASSERT_EQ(entry.snapshot_id, kSnapshotId);
+ // TODO(zhjwpku): Check the sequence_number and file_sequence_number of V1
manifest
+ if (entry.sequence_number.has_value()) {
+ ASSERT_EQ(entry.sequence_number.value(), expected_sequence_number);
+ }
+ if (entry.file_sequence_number.has_value()) {
+ ASSERT_EQ(entry.file_sequence_number.value(), expected_sequence_number);
+ }
+ CheckDataFile(*entry.data_file, expected_content, expected_first_row_id);
+ }
+
+ std::shared_ptr<Schema> schema_{nullptr};
+ std::shared_ptr<PartitionSpec> spec_{nullptr};
+ std::shared_ptr<DataFile> data_file_{nullptr};
+ std::shared_ptr<DataFile> delete_file_{nullptr};
+ std::shared_ptr<DataFile> data_file_without_first_row_id_{nullptr};
+
+ std::shared_ptr<FileIO> file_io_{nullptr};
+};
+
+TEST_F(ManifestWriterVersionsTest, TestV1Write) {
+ auto manifest = WriteManifest(/*format_version=*/1, {data_file_});
+ CheckManifest(manifest, TableMetadata::kInvalidSequenceNumber,
+ TableMetadata::kInvalidSequenceNumber);
+ auto entries = ReadManifest(manifest);
+ ASSERT_EQ(entries.size(), 1);
+ CheckEntry(entries[0], TableMetadata::kInvalidSequenceNumber,
+ TableMetadata::kInvalidSequenceNumber, DataFile::Content::kData);
+}
+
+TEST_F(ManifestWriterVersionsTest, TestV1WriteDelete) {
+ const std::string manifest_path = CreateManifestPath();
+ auto writer_result =
+ ManifestWriter::MakeV1Writer(kSnapshotId, manifest_path, file_io_,
spec_, schema_);
+
+ EXPECT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ ManifestEntry entry;
+ entry.snapshot_id = kSnapshotId;
+ entry.data_file = std::move(delete_file_);
+
+ auto status = writer->WriteDeletedEntry(entry);
+ EXPECT_THAT(status, IsError(ErrorKind::kInvalidArgument));
+ EXPECT_THAT(status, HasErrorMessage(
+ "Cannot write equality_deletes file to data manifest
file"));
+}
+
+TEST_F(ManifestWriterVersionsTest, TestV1WriteWithInheritance) {
+ auto manifests =
+ WriteAndReadManifests({WriteManifest(/*format_version=*/1,
{data_file_})}, 1);
+ ASSERT_EQ(manifests.size(), 1);
+ CheckManifest(manifests[0], TableMetadata::kInitialSequenceNumber,
+ TableMetadata::kInitialSequenceNumber);
+ auto entries = ReadManifest(manifests[0]);
+ ASSERT_EQ(entries.size(), 1);
+ CheckEntry(entries[0], TableMetadata::kInitialSequenceNumber,
+ TableMetadata::kInitialSequenceNumber, DataFile::Content::kData);
+}
+
+TEST_F(ManifestWriterVersionsTest, TestV2Write) {
+ auto manifest = WriteManifest(/*format_version=*/2, {data_file_});
+ CheckManifest(manifest, TableMetadata::kInvalidSequenceNumber,
+ TableMetadata::kInvalidSequenceNumber);
+ auto entries = ReadManifest(manifest);
+ ASSERT_EQ(entries.size(), 1);
+ ASSERT_EQ(manifest.content, ManifestContent::kData);
+ CheckEntry(entries[0], TableMetadata::kInvalidSequenceNumber,
+ TableMetadata::kInvalidSequenceNumber, DataFile::Content::kData);
+}
+
+TEST_F(ManifestWriterVersionsTest, TestV2WriteWithInheritance) {
+ auto manifests =
+ WriteAndReadManifests({WriteManifest(/*format_version=*/2,
{data_file_})}, 2);
+ CheckManifest(manifests[0], kSequenceNumber, kSequenceNumber);
+ auto entries = ReadManifest(manifests[0]);
+ ASSERT_EQ(entries.size(), 1);
+ ASSERT_EQ(manifests[0].content, ManifestContent::kData);
+ CheckEntry(entries[0], kSequenceNumber, kSequenceNumber,
DataFile::Content::kData);
+}
+
+TEST_F(ManifestWriterVersionsTest, TestV2PlusWriteDeleteV2) {
+ auto manifest = WriteDeleteManifest(/*format_version=*/2, delete_file_);
+ CheckManifest(manifest, TableMetadata::kInvalidSequenceNumber,
+ TableMetadata::kInvalidSequenceNumber);
+ auto entries = ReadManifest(manifest);
+ ASSERT_EQ(entries.size(), 1);
+ ASSERT_EQ(manifest.content, ManifestContent::kDeletes);
+ CheckEntry(entries[0], kSequenceNumber, std::nullopt,
+ DataFile::Content::kEqualityDeletes, ManifestStatus::kDeleted);
+}
+
+TEST_F(ManifestWriterVersionsTest, TestV2ManifestListRewriteWithInheritance) {
+ // write with v1
+ auto manifests =
+ WriteAndReadManifests({WriteManifest(/*format_version=*/1,
{data_file_})}, 1);
+ CheckManifest(manifests[0], TableMetadata::kInitialSequenceNumber,
+ TableMetadata::kInitialSequenceNumber);
+
+ // rewrite existing metadata with v2 manifest list
+ auto manifests2 = WriteAndReadManifests(manifests, 2);
+ // the ManifestFile did not change and should still have its original
sequence number, 0
+ CheckManifest(manifests2[0], TableMetadata::kInitialSequenceNumber,
+ TableMetadata::kInitialSequenceNumber);
+
+ // should not inherit the v2 sequence number because it was a rewrite
+ auto entries = ReadManifest(manifests2[0]);
+ CheckEntry(entries[0], TableMetadata::kInitialSequenceNumber,
+ TableMetadata::kInitialSequenceNumber, DataFile::Content::kData);
+}
+
+TEST_F(ManifestWriterVersionsTest, TestV2ManifestRewriteWithInheritance) {
+ // write with v1
+ auto manifests =
+ WriteAndReadManifests({WriteManifest(/*format_version=*/1,
{data_file_})}, 1);
+ CheckManifest(manifests[0], TableMetadata::kInitialSequenceNumber,
+ TableMetadata::kInitialSequenceNumber);
+
+ // rewrite the manifest file using a v2 manifest
+ auto rewritten_manifest = RewriteManifest(manifests[0], 2);
+ CheckRewrittenManifest(rewritten_manifest,
TableMetadata::kInvalidSequenceNumber,
+ TableMetadata::kInitialSequenceNumber);
+
+ // add the v2 manifest to a v2 manifest list, with a sequence number
+ auto manifests2 = WriteAndReadManifests({rewritten_manifest}, 2);
+ // the ManifestFile is new so it has a sequence number, but the min sequence
number 0 is
+ // from the entry
+ CheckRewrittenManifest(manifests2[0], kSequenceNumber,
+ TableMetadata::kInitialSequenceNumber);
+
+ // should not inherit the v2 sequence number because it was written into the
v2 manifest
+ auto entries = ReadManifest(manifests2[0]);
+ CheckRewrittenEntry(entries[0], TableMetadata::kInitialSequenceNumber,
+ DataFile::Content::kData);
+}
+
+TEST_F(ManifestWriterVersionsTest, TestV3Write) {
+ auto manifest = WriteManifest(/*format_version=*/3, {data_file_});
+ CheckManifest(manifest, TableMetadata::kInvalidSequenceNumber,
+ TableMetadata::kInvalidSequenceNumber);
+ auto entries = ReadManifest(manifest);
+ ASSERT_EQ(entries.size(), 1);
+ ASSERT_EQ(manifest.content, ManifestContent::kData);
+ CheckEntry(entries[0], TableMetadata::kInvalidSequenceNumber,
+ TableMetadata::kInvalidSequenceNumber, DataFile::Content::kData,
+ ManifestStatus::kAdded, kFirstRowId);
+}
+
+TEST_F(ManifestWriterVersionsTest, TestV3WriteWithInheritance) {
+ auto manifests = WriteAndReadManifests(
+ {WriteManifest(/*format_version=*/3,
{data_file_without_first_row_id_})}, 3);
+ CheckManifest(manifests[0], kSequenceNumber, kSequenceNumber);
+ ASSERT_EQ(manifests[0].content, ManifestContent::kData);
+
+ // v2+ should use the correct sequence number by inheriting it
+ // v3 should use the correct first-row-id by inheriting it
+ auto entries = ReadManifest(manifests[0]);
+ ASSERT_EQ(entries.size(), 1);
+ // first_row_id should be inherited
+ CheckEntry(entries[0], kSequenceNumber, kSequenceNumber,
DataFile::Content::kData,
+ ManifestStatus::kAdded, kFirstRowId);
+}
+
+TEST_F(ManifestWriterVersionsTest, TestV3WriteFirstRowIdAssignment) {
+ auto manifests = WriteAndReadManifests(
+ {WriteManifest(/*format_version=*/3,
+ {data_file_without_first_row_id_,
data_file_without_first_row_id_})},
+ 3);
+ ASSERT_EQ(manifests[0].content, ManifestContent::kData);
+
+ // v2+ should use the correct sequence number by inheriting it
+ // v3 should use the correct first-row-id by inheriting it
+ auto entries = ReadManifest(manifests[0]);
+ ASSERT_EQ(entries.size(), 2);
+ int64_t expected_first_row_id = kFirstRowId;
+ for (const auto& entry : entries) {
+ CheckEntry(entry, kSequenceNumber, kSequenceNumber,
DataFile::Content::kData,
+ ManifestStatus::kAdded, expected_first_row_id);
+ expected_first_row_id += kMetrics.row_count.value();
+ }
+}
+
+TEST_F(ManifestWriterVersionsTest, TestV3ManifestListRewriteWithInheritance) {
+ // write with v1
+ auto manifests =
+ WriteAndReadManifests({WriteManifest(/*format_version=*/1,
{data_file_})}, 1);
+ CheckManifest(manifests[0], TableMetadata::kInitialSequenceNumber,
+ TableMetadata::kInitialSequenceNumber);
+
+ // rewrite existing metadata with a manifest list
+ auto manifests3 = WriteAndReadManifests(manifests, 3);
+ // the ManifestFile did not change and should still have its original
sequence number, 0
+ CheckManifest(manifests3[0], TableMetadata::kInitialSequenceNumber,
+ TableMetadata::kInitialSequenceNumber);
+
+ // should not inherit the sequence number because it was a rewrite
+ auto entries = ReadManifest(manifests3[0]);
+ CheckEntry(entries[0], TableMetadata::kInitialSequenceNumber,
+ TableMetadata::kInitialSequenceNumber, DataFile::Content::kData,
+ ManifestStatus::kAdded, kFirstRowId);
+}
+
+TEST_F(ManifestWriterVersionsTest, TestV3ManifestRewriteWithInheritance) {
+ // write with v1
+ auto manifests =
+ WriteAndReadManifests({WriteManifest(/*format_version=*/1,
{data_file_})}, 1);
+ CheckManifest(manifests[0], TableMetadata::kInitialSequenceNumber,
+ TableMetadata::kInitialSequenceNumber);
+
+ // rewrite the manifest file using a v3 manifest
+ auto rewritten_manifest = RewriteManifest(manifests[0], 3);
+ CheckRewrittenManifest(rewritten_manifest,
TableMetadata::kInvalidSequenceNumber,
+ TableMetadata::kInitialSequenceNumber);
+
+ // add the v3 manifest to a v3 manifest list, with a sequence number
+ auto manifests3 = WriteAndReadManifests({rewritten_manifest}, 3);
+ // the ManifestFile is new so it has a sequence number, but the min sequence
number 0 is
+ // from the entry
+ CheckRewrittenManifest(manifests3[0], kSequenceNumber,
+ TableMetadata::kInitialSequenceNumber);
+
+ // should not inherit the v3 sequence number because it was written into the
v3 manifest
+ auto entries = ReadManifest(manifests3[0]);
+ CheckRewrittenEntry(entries[0], TableMetadata::kInitialSequenceNumber,
+ DataFile::Content::kData, kFirstRowId);
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/test/parquet_test.cc b/src/iceberg/test/parquet_test.cc
index a9be5fb..19f8bd8 100644
--- a/src/iceberg/test/parquet_test.cc
+++ b/src/iceberg/test/parquet_test.cc
@@ -102,13 +102,12 @@ void DoRoundtrip(std::shared_ptr<::arrow::Array> data,
std::shared_ptr<Schema> s
ASSERT_THAT(WriteArray(data, *writer), IsOk());
std::unordered_map<std::string, std::string> read_metadata;
- ASSERT_THAT(ReadArray(out,
- {.path = basePath,
- .length = writer->length(),
- .io = file_io,
- .projection = schema},
- &read_metadata),
- IsOk());
+ ICEBERG_UNWRAP_OR_FAIL(auto length, writer->length());
+ ASSERT_THAT(
+ ReadArray(out,
+ {.path = basePath, .length = length, .io = file_io,
.projection = schema},
+ &read_metadata),
+ IsOk());
ASSERT_EQ(read_metadata, metadata);
ASSERT_TRUE(out != nullptr) << "Reader.Next() returned no data";
diff --git a/src/iceberg/test/partition_field_test.cc
b/src/iceberg/test/partition_field_test.cc
index 04ee9b2..8bc2f52 100644
--- a/src/iceberg/test/partition_field_test.cc
+++ b/src/iceberg/test/partition_field_test.cc
@@ -24,7 +24,6 @@
#include <gtest/gtest.h>
#include "iceberg/transform.h"
-#include "iceberg/type.h"
#include "iceberg/util/formatter.h" // IWYU pragma: keep
namespace iceberg {
diff --git a/src/iceberg/test/partition_spec_test.cc
b/src/iceberg/test/partition_spec_test.cc
index 3ae264f..b7a9fed 100644
--- a/src/iceberg/test/partition_spec_test.cc
+++ b/src/iceberg/test/partition_spec_test.cc
@@ -44,20 +44,18 @@ TEST(PartitionSpecTest, Basics) {
auto identity_transform = Transform::Identity();
PartitionField pt_field1(5, 1000, "day", identity_transform);
PartitionField pt_field2(5, 1001, "hour", identity_transform);
- auto spec_result = PartitionSpec::Make(100, {pt_field1, pt_field2});
- ASSERT_TRUE(spec_result.has_value());
- auto& spec = *spec_result.value();
- ASSERT_EQ(spec, spec);
- ASSERT_EQ(100, spec.spec_id());
- std::span<const PartitionField> fields = spec.fields();
+ ICEBERG_UNWRAP_OR_FAIL(auto spec, PartitionSpec::Make(100, {pt_field1,
pt_field2}));
+ ASSERT_EQ(*spec, *spec);
+ ASSERT_EQ(100, spec->spec_id());
+ std::span<const PartitionField> fields = spec->fields();
ASSERT_EQ(2, fields.size());
ASSERT_EQ(pt_field1, fields[0]);
ASSERT_EQ(pt_field2, fields[1]);
auto spec_str =
"partition_spec[spec_id<100>,\n day (1000 identity(5))\n hour (1001 "
"identity(5))\n]";
- EXPECT_EQ(spec_str, spec.ToString());
- EXPECT_EQ(spec_str, std::format("{}", spec));
+ EXPECT_EQ(spec_str, spec->ToString());
+ EXPECT_EQ(spec_str, std::format("{}", *spec));
}
}
@@ -68,24 +66,24 @@ TEST(PartitionSpecTest, Equality) {
PartitionField pt_field1(5, 1000, "day", identity_transform);
PartitionField pt_field2(7, 1001, "hour", identity_transform);
PartitionField pt_field3(7, 1001, "hour", identity_transform);
- auto schema1 = PartitionSpec::Make(100, {pt_field1, pt_field2}).value();
- auto schema2 = PartitionSpec::Make(101, {pt_field1, pt_field2}).value();
- auto schema3 = PartitionSpec::Make(101, {pt_field1}).value();
- auto schema4 = PartitionSpec::Make(101, {pt_field3, pt_field1}).value();
- auto schema5 = PartitionSpec::Make(100, {pt_field1, pt_field2}).value();
- auto schema6 = PartitionSpec::Make(100, {pt_field2, pt_field1}).value();
-
- ASSERT_EQ(*schema1, *schema1);
- ASSERT_NE(*schema1, *schema2);
- ASSERT_NE(*schema2, *schema1);
- ASSERT_NE(*schema1, *schema3);
- ASSERT_NE(*schema3, *schema1);
- ASSERT_NE(*schema1, *schema4);
- ASSERT_NE(*schema4, *schema1);
- ASSERT_EQ(*schema1, *schema5);
- ASSERT_EQ(*schema5, *schema1);
- ASSERT_NE(*schema1, *schema6);
- ASSERT_NE(*schema6, *schema1);
+ ICEBERG_UNWRAP_OR_FAIL(auto spec1, PartitionSpec::Make(100, {pt_field1,
pt_field2}));
+ ICEBERG_UNWRAP_OR_FAIL(auto spec2, PartitionSpec::Make(101, {pt_field1,
pt_field2}));
+ ICEBERG_UNWRAP_OR_FAIL(auto spec3, PartitionSpec::Make(101, {pt_field1}));
+ ICEBERG_UNWRAP_OR_FAIL(auto spec4, PartitionSpec::Make(101, {pt_field3,
pt_field1}));
+ ICEBERG_UNWRAP_OR_FAIL(auto spec5, PartitionSpec::Make(100, {pt_field1,
pt_field2}));
+ ICEBERG_UNWRAP_OR_FAIL(auto spec6, PartitionSpec::Make(100, {pt_field2,
pt_field1}));
+
+ ASSERT_EQ(*spec1, *spec1);
+ ASSERT_NE(*spec1, *spec2);
+ ASSERT_NE(*spec2, *spec1);
+ ASSERT_NE(*spec1, *spec3);
+ ASSERT_NE(*spec3, *spec1);
+ ASSERT_NE(*spec1, *spec4);
+ ASSERT_NE(*spec4, *spec1);
+ ASSERT_EQ(*spec1, *spec5);
+ ASSERT_EQ(*spec5, *spec1);
+ ASSERT_NE(*spec1, *spec6);
+ ASSERT_NE(*spec6, *spec1);
}
TEST(PartitionSpecTest, PartitionSchemaTest) {
@@ -95,15 +93,13 @@ TEST(PartitionSpecTest, PartitionSchemaTest) {
auto identity_transform = Transform::Identity();
PartitionField pt_field1(5, 1000, "day", identity_transform);
PartitionField pt_field2(7, 1001, "hour", identity_transform);
- auto spec = PartitionSpec::Make(100, {pt_field1, pt_field2}).value();
-
- auto partition_type = spec->PartitionType(schema);
- ASSERT_TRUE(partition_type.has_value());
- ASSERT_EQ(2, partition_type.value()->fields().size());
- EXPECT_EQ(pt_field1.name(), partition_type.value()->fields()[0].name());
- EXPECT_EQ(pt_field1.field_id(),
partition_type.value()->fields()[0].field_id());
- EXPECT_EQ(pt_field2.name(), partition_type.value()->fields()[1].name());
- EXPECT_EQ(pt_field2.field_id(),
partition_type.value()->fields()[1].field_id());
+ ICEBERG_UNWRAP_OR_FAIL(auto spec, PartitionSpec::Make(100, {pt_field1,
pt_field2}));
+ ICEBERG_UNWRAP_OR_FAIL(auto partition_type, spec->PartitionType(schema));
+ ASSERT_EQ(2, partition_type->fields().size());
+ EXPECT_EQ(pt_field1.name(), partition_type->fields()[0].name());
+ EXPECT_EQ(pt_field1.field_id(), partition_type->fields()[0].field_id());
+ EXPECT_EQ(pt_field2.name(), partition_type->fields()[1].name());
+ EXPECT_EQ(pt_field2.field_id(), partition_type->fields()[1].field_id());
}
TEST(PartitionSpecTest, PartitionTypeTest) {
@@ -138,21 +134,18 @@ TEST(PartitionSpecTest, PartitionTypeTest) {
std::vector<SchemaField>{field1, field2, field3, field4, field5, field6},
Schema::kInitialSchemaId);
- auto parsed_spec_result = PartitionSpecFromJson(schema, json, 1);
- ASSERT_TRUE(parsed_spec_result.has_value()) <<
parsed_spec_result.error().message;
-
- auto partition_type = parsed_spec_result.value()->PartitionType(*schema);
+ ICEBERG_UNWRAP_OR_FAIL(auto parsed_spec, PartitionSpecFromJson(schema, json,
1));
+ ICEBERG_UNWRAP_OR_FAIL(auto partition_type,
parsed_spec->PartitionType(*schema));
SchemaField pt_field1(1000, "ts_day", date(), true);
SchemaField pt_field2(1001, "id_bucket", int32(), true);
SchemaField pt_field3(1002, "id_truncate", string(), true);
- ASSERT_TRUE(partition_type.has_value());
- ASSERT_EQ(3, partition_type.value()->fields().size());
+ ASSERT_EQ(3, partition_type->fields().size());
- EXPECT_EQ(pt_field1, partition_type.value()->fields()[0]);
- EXPECT_EQ(pt_field2, partition_type.value()->fields()[1]);
- EXPECT_EQ(pt_field3, partition_type.value()->fields()[2]);
+ EXPECT_EQ(pt_field1, partition_type->fields()[0]);
+ EXPECT_EQ(pt_field2, partition_type->fields()[1]);
+ EXPECT_EQ(pt_field3, partition_type->fields()[2]);
}
TEST(PartitionSpecTest, InvalidTransformForType) {
@@ -219,8 +212,7 @@ TEST(PartitionSpecTest, PartitionFieldInStruct) {
Schema schema({outer_struct}, Schema::kInitialSchemaId);
PartitionField pt_field(1, 1000, "id_partition", Transform::Identity());
- auto result = PartitionSpec::Make(schema, 1, {pt_field}, false);
- EXPECT_THAT(result, IsOk());
+ EXPECT_THAT(PartitionSpec::Make(schema, 1, {pt_field}, false), IsOk());
}
TEST(PartitionSpecTest, PartitionFieldInStructInStruct) {
@@ -235,8 +227,7 @@ TEST(PartitionSpecTest, PartitionFieldInStructInStruct) {
Schema schema({outer_field}, Schema::kInitialSchemaId);
PartitionField pt_field(1, 1000, "id_partition", Transform::Identity());
- auto result = PartitionSpec::Make(schema, 1, {pt_field}, false);
- EXPECT_THAT(result, IsOk());
+ EXPECT_THAT(PartitionSpec::Make(schema, 1, {pt_field}, false), IsOk());
}
TEST(PartitionSpecTest, PartitionFieldInList) {
diff --git a/src/iceberg/test/struct_like_test.cc
b/src/iceberg/test/struct_like_test.cc
index 3683ed2..d64e219 100644
--- a/src/iceberg/test/struct_like_test.cc
+++ b/src/iceberg/test/struct_like_test.cc
@@ -64,7 +64,7 @@ TEST(ManifestFileStructLike, BasicFields) {
.manifest_path = "/path/to/manifest.avro",
.manifest_length = 12345,
.partition_spec_id = 1,
- .content = ManifestFile::Content::kData,
+ .content = ManifestContent::kData,
.sequence_number = 100,
.min_sequence_number = 90,
.added_snapshot_id = 1001,
@@ -89,7 +89,7 @@ TEST(ManifestFileStructLike, BasicFields) {
struct_like.GetField(static_cast<size_t>(ManifestFileField::kPartitionSpecId)),
int32_t, 1);
EXPECT_SCALAR_EQ(struct_like.GetField(static_cast<size_t>(ManifestFileField::kContent)),
- int32_t,
static_cast<int32_t>(ManifestFile::Content::kData));
+ int32_t, static_cast<int32_t>(ManifestContent::kData));
EXPECT_SCALAR_EQ(
struct_like.GetField(static_cast<size_t>(ManifestFileField::kSequenceNumber)),
int64_t, 100);
@@ -103,7 +103,7 @@ TEST(ManifestFileStructLike, OptionalFields) {
ManifestFile manifest_file{.manifest_path = "/path/to/manifest2.avro",
.manifest_length = 54321,
.partition_spec_id = 2,
- .content = ManifestFile::Content::kDeletes,
+ .content = ManifestContent::kDeletes,
.sequence_number = 200,
.min_sequence_number = 180,
.added_snapshot_id = 2001,
@@ -127,7 +127,7 @@ TEST(ManifestFileStructLike, OptionalFields) {
struct_like.GetField(static_cast<size_t>(ManifestFileField::kFirstRowId)),
int64_t,
12345);
EXPECT_SCALAR_EQ(struct_like.GetField(static_cast<size_t>(ManifestFileField::kContent)),
- int32_t,
static_cast<int32_t>(ManifestFile::Content::kDeletes));
+ int32_t, static_cast<int32_t>(ManifestContent::kDeletes));
}
TEST(ManifestFileStructLike, WithPartitions) {
@@ -135,7 +135,7 @@ TEST(ManifestFileStructLike, WithPartitions) {
.manifest_path = "/path/to/manifest3.avro",
.manifest_length = 98765,
.partition_spec_id = 3,
- .content = ManifestFile::Content::kData,
+ .content = ManifestContent::kData,
.sequence_number = 300,
.min_sequence_number = 290,
.added_snapshot_id = 3001,
diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h
index 79b43f5..8ae213a 100644
--- a/src/iceberg/type_fwd.h
+++ b/src/iceberg/type_fwd.h
@@ -134,6 +134,8 @@ struct ManifestFile;
struct ManifestList;
struct PartitionFieldSummary;
+class PartitionSummary;
+
class ManifestListReader;
class ManifestListWriter;
class ManifestReader;
diff --git a/src/iceberg/v1_metadata.cc b/src/iceberg/v1_metadata.cc
index 2da81ea..101ef47 100644
--- a/src/iceberg/v1_metadata.cc
+++ b/src/iceberg/v1_metadata.cc
@@ -34,9 +34,8 @@ namespace iceberg {
ManifestEntryAdapterV1::ManifestEntryAdapterV1(
std::optional<int64_t> snapshot_id, std::shared_ptr<PartitionSpec>
partition_spec,
std::shared_ptr<Schema> current_schema)
- : ManifestEntryAdapter(std::move(partition_spec),
std::move(current_schema),
- ManifestContent::kData),
- snapshot_id_(snapshot_id) {}
+ : ManifestEntryAdapter(snapshot_id, std::move(partition_spec),
+ std::move(current_schema), ManifestContent::kData)
{}
std::shared_ptr<Schema> ManifestEntryAdapterV1::EntrySchema(
std::shared_ptr<StructType> partition_type) {
@@ -82,10 +81,9 @@ Status ManifestEntryAdapterV1::Init() {
metadata_["partition-spec-id"] = std::to_string(partition_spec_->spec_id());
metadata_["format-version"] = "1";
- ICEBERG_ASSIGN_OR_RAISE(auto partition_type,
+ ICEBERG_ASSIGN_OR_RAISE(partition_type_,
partition_spec_->PartitionType(*current_schema_));
-
- manifest_schema_ = EntrySchema(std::move(partition_type));
+ manifest_schema_ = EntrySchema(partition_type_);
return ToArrowSchema(*manifest_schema_, &schema_);
}
@@ -121,7 +119,7 @@ Status ManifestFileAdapterV1::Init() {
}
Status ManifestFileAdapterV1::Append(const ManifestFile& file) {
- if (file.content != ManifestFile::Content::kData) {
+ if (file.content != ManifestContent::kData) {
return InvalidManifestList("Cannot store delete manifests in a v1 table");
}
return AppendInternal(file);
diff --git a/src/iceberg/v1_metadata.h b/src/iceberg/v1_metadata.h
index 4279230..a6d7ab4 100644
--- a/src/iceberg/v1_metadata.h
+++ b/src/iceberg/v1_metadata.h
@@ -39,9 +39,6 @@ class ManifestEntryAdapterV1 : public ManifestEntryAdapter {
static std::shared_ptr<Schema> WrapFileSchema(std::shared_ptr<StructType>
file_schema);
static std::shared_ptr<StructType> DataFileSchema(
std::shared_ptr<StructType> partition_type);
-
- private:
- std::optional<int64_t> snapshot_id_;
};
/// \brief Adapter to convert V1 ManifestFile to `ArrowArray`.
diff --git a/src/iceberg/v2_metadata.cc b/src/iceberg/v2_metadata.cc
index e5f2226..486acb6 100644
--- a/src/iceberg/v2_metadata.cc
+++ b/src/iceberg/v2_metadata.cc
@@ -31,9 +31,8 @@ namespace iceberg {
ManifestEntryAdapterV2::ManifestEntryAdapterV2(
std::optional<int64_t> snapshot_id, std::shared_ptr<PartitionSpec>
partition_spec,
std::shared_ptr<Schema> current_schema, ManifestContent content)
- : ManifestEntryAdapter(std::move(partition_spec),
std::move(current_schema),
- std::move(content)),
- snapshot_id_(snapshot_id) {}
+ : ManifestEntryAdapter(snapshot_id, std::move(partition_spec),
+ std::move(current_schema), std::move(content)) {}
std::shared_ptr<Schema> ManifestEntryAdapterV2::EntrySchema(
std::shared_ptr<StructType> partition_type) {
@@ -81,9 +80,9 @@ Status ManifestEntryAdapterV2::Init() {
metadata_["format-version"] = "2";
metadata_["content"] = content_ == ManifestContent::kData ? "data" :
"delete";
- ICEBERG_ASSIGN_OR_RAISE(auto partition_type,
+ ICEBERG_ASSIGN_OR_RAISE(partition_type_,
partition_spec_->PartitionType(*current_schema_));
- manifest_schema_ = EntrySchema(std::move(partition_type));
+ manifest_schema_ = EntrySchema(partition_type_);
return ToArrowSchema(*manifest_schema_, &schema_);
}
diff --git a/src/iceberg/v2_metadata.h b/src/iceberg/v2_metadata.h
index 4459d9b..3097085 100644
--- a/src/iceberg/v2_metadata.h
+++ b/src/iceberg/v2_metadata.h
@@ -45,9 +45,6 @@ class ManifestEntryAdapterV2 : public ManifestEntryAdapter {
const ManifestEntry& entry) const override;
Result<std::optional<std::string>> GetReferenceDataFile(
const DataFile& file) const override;
-
- private:
- std::optional<int64_t> snapshot_id_;
};
/// \brief Adapter to convert V2 ManifestFile to `ArrowArray`.
diff --git a/src/iceberg/v3_metadata.cc b/src/iceberg/v3_metadata.cc
index 64d674a..ae309ed 100644
--- a/src/iceberg/v3_metadata.cc
+++ b/src/iceberg/v3_metadata.cc
@@ -20,10 +20,12 @@
#include "iceberg/v3_metadata.h"
#include <memory>
+#include <optional>
#include "iceberg/json_internal.h"
#include "iceberg/manifest_entry.h"
#include "iceberg/manifest_list.h"
+#include "iceberg/result.h"
#include "iceberg/schema.h"
#include "iceberg/schema_internal.h"
#include "iceberg/util/macros.h"
@@ -34,9 +36,8 @@ ManifestEntryAdapterV3::ManifestEntryAdapterV3(
std::optional<int64_t> snapshot_id, std::optional<int64_t> first_row_id,
std::shared_ptr<PartitionSpec> partition_spec, std::shared_ptr<Schema>
current_schema,
ManifestContent content)
- : ManifestEntryAdapter(std::move(partition_spec),
std::move(current_schema),
- std::move(content)),
- snapshot_id_(snapshot_id),
+ : ManifestEntryAdapter(snapshot_id, std::move(partition_spec),
+ std::move(current_schema), std::move(content)),
first_row_id_(first_row_id) {}
std::shared_ptr<Schema> ManifestEntryAdapterV3::EntrySchema(
@@ -90,9 +91,9 @@ Status ManifestEntryAdapterV3::Init() {
metadata_["format-version"] = "3";
metadata_["content"] = content_ == ManifestContent::kData ? "data" :
"delete";
- ICEBERG_ASSIGN_OR_RAISE(auto partition_type,
+ ICEBERG_ASSIGN_OR_RAISE(partition_type_,
partition_spec_->PartitionType(*current_schema_));
- manifest_schema_ = EntrySchema(std::move(partition_type));
+ manifest_schema_ = EntrySchema(partition_type_);
return ToArrowSchema(*manifest_schema_, &schema_);
}
@@ -134,10 +135,13 @@ Result<std::optional<std::string>>
ManifestEntryAdapterV3::GetReferenceDataFile(
Result<std::optional<int64_t>> ManifestEntryAdapterV3::GetFirstRowId(
const DataFile& file) const {
- if (file.content == DataFile::Content::kData) {
+ if (file.content != DataFile::Content::kData) {
+ return std::nullopt;
+ }
+ if (file.first_row_id.has_value()) {
return file.first_row_id;
}
- return std::nullopt;
+ return first_row_id_;
}
Result<std::optional<int64_t>> ManifestEntryAdapterV3::GetContentOffset(
@@ -191,7 +195,10 @@ Status ManifestFileAdapterV3::Init() {
Status ManifestFileAdapterV3::Append(const ManifestFile& file) {
ICEBERG_RETURN_UNEXPECTED(AppendInternal(file));
- if (WrapFirstRowId(file) && next_row_id_.has_value()) {
+ if (WrapFirstRowId(file)) {
+ if (!next_row_id_.has_value()) {
+ return InvalidManifestList("Missing next-row-id for file: {}",
file.manifest_path);
+ }
next_row_id_ = next_row_id_.value() + file.existing_rows_count.value_or(0)
+
file.added_rows_count.value_or(0);
}
@@ -234,13 +241,13 @@ Result<std::optional<int64_t>>
ManifestFileAdapterV3::GetFirstRowId(
const ManifestFile& file) const {
if (WrapFirstRowId(file)) {
// if first-row-id is assigned, ensure that it is valid
- if (!next_row_id_.has_value()) {
+ if (file.first_row_id.has_value()) {
// TODO(gangwu): add ToString for ManifestFile
return InvalidManifestList("Found invalid first-row-id assignment: {}",
file.manifest_path);
}
return next_row_id_;
- } else if (file.content != ManifestFile::Content::kData) {
+ } else if (file.content != ManifestContent::kData) {
return std::nullopt;
} else {
if (!file.first_row_id.has_value()) {
@@ -252,7 +259,7 @@ Result<std::optional<int64_t>>
ManifestFileAdapterV3::GetFirstRowId(
}
bool ManifestFileAdapterV3::WrapFirstRowId(const ManifestFile& file) const {
- return file.content == ManifestFile::Content::kData &&
!file.first_row_id.has_value();
+ return file.content == ManifestContent::kData &&
!file.first_row_id.has_value();
}
} // namespace iceberg
diff --git a/src/iceberg/v3_metadata.h b/src/iceberg/v3_metadata.h
index a8262bc..163f6ed 100644
--- a/src/iceberg/v3_metadata.h
+++ b/src/iceberg/v3_metadata.h
@@ -22,6 +22,7 @@
/// \file iceberg/v3_metadata.h
#include "iceberg/manifest_adapter.h"
+#include "iceberg/result.h"
namespace iceberg {
@@ -52,7 +53,6 @@ class ManifestEntryAdapterV3 : public ManifestEntryAdapter {
const DataFile& file) const override;
private:
- std::optional<int64_t> snapshot_id_;
std::optional<int64_t> first_row_id_;
};
@@ -60,7 +60,7 @@ class ManifestEntryAdapterV3 : public ManifestEntryAdapter {
class ManifestFileAdapterV3 : public ManifestFileAdapter {
public:
ManifestFileAdapterV3(int64_t snapshot_id, std::optional<int64_t>
parent_snapshot_id,
- int64_t sequence_number, int64_t first_row_id)
+ int64_t sequence_number, std::optional<int64_t>
first_row_id)
: snapshot_id_(snapshot_id),
parent_snapshot_id_(parent_snapshot_id),
sequence_number_(sequence_number),