This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new bbdb227e feat: add rolling manifest writer (#443)
bbdb227e is described below
commit bbdb227eeb9daf9a0d7f9ee0f5d70001ed8164e5
Author: Junwang Zhao <[email protected]>
AuthorDate: Tue Dec 30 13:44:57 2025 +0800
feat: add rolling manifest writer (#443)
---
src/iceberg/CMakeLists.txt | 1 +
src/iceberg/file_writer.h | 2 +-
src/iceberg/manifest/manifest_entry.h | 4 +-
src/iceberg/manifest/manifest_writer.cc | 2 +
src/iceberg/manifest/manifest_writer.h | 18 +-
src/iceberg/manifest/meson.build | 1 +
src/iceberg/manifest/rolling_manifest_writer.cc | 119 +++++++++
src/iceberg/manifest/rolling_manifest_writer.h | 128 +++++++++
src/iceberg/meson.build | 1 +
src/iceberg/test/CMakeLists.txt | 3 +-
src/iceberg/test/rolling_manifest_writer_test.cc | 322 +++++++++++++++++++++++
src/iceberg/transaction.cc | 2 +-
src/iceberg/type.cc | 3 +-
13 files changed, 593 insertions(+), 13 deletions(-)
diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index ce339cfe..36c3a483 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -44,6 +44,7 @@ set(ICEBERG_SOURCES
manifest/manifest_list.cc
manifest/manifest_reader.cc
manifest/manifest_writer.cc
+ manifest/rolling_manifest_writer.cc
manifest/v1_metadata.cc
manifest/v2_metadata.cc
manifest/v3_metadata.cc
diff --git a/src/iceberg/file_writer.h b/src/iceberg/file_writer.h
index f3540dd7..72ab0da1 100644
--- a/src/iceberg/file_writer.h
+++ b/src/iceberg/file_writer.h
@@ -103,7 +103,7 @@ class ICEBERG_EXPORT Writer {
virtual Result<Metrics> metrics() = 0;
/// \brief Get the file length.
- /// Only valid after the file is closed.
+ /// This can be called while the writer is still open or after the file is
closed.
virtual Result<int64_t> length() = 0;
/// \brief Returns a list of recommended split locations, if applicable,
empty
diff --git a/src/iceberg/manifest/manifest_entry.h
b/src/iceberg/manifest/manifest_entry.h
index f82b7a2b..5d35530b 100644
--- a/src/iceberg/manifest/manifest_entry.h
+++ b/src/iceberg/manifest/manifest_entry.h
@@ -46,7 +46,7 @@ enum class ManifestStatus {
/// \brief Get the relative manifest status type from int
ICEBERG_EXPORT constexpr Result<ManifestStatus> ManifestStatusFromInt(
- int status) noexcept {
+ int32_t status) noexcept {
switch (status) {
case 0:
return ManifestStatus::kExisting;
@@ -387,7 +387,7 @@ ICEBERG_EXPORT constexpr std::string_view
ToString(DataFile::Content type) noexc
/// \brief Get the relative data file content type from int
ICEBERG_EXPORT constexpr Result<DataFile::Content> DataFileContentFromInt(
- int content) noexcept {
+ int32_t content) noexcept {
switch (content) {
case 0:
return DataFile::Content::kData;
diff --git a/src/iceberg/manifest/manifest_writer.cc
b/src/iceberg/manifest/manifest_writer.cc
index 8cd940d5..e3d2564a 100644
--- a/src/iceberg/manifest/manifest_writer.cc
+++ b/src/iceberg/manifest/manifest_writer.cc
@@ -217,6 +217,8 @@ ManifestContent ManifestWriter::content() const { return
adapter_->content(); }
Result<Metrics> ManifestWriter::metrics() const { return writer_->metrics(); }
+Result<int64_t> ManifestWriter::length() const { return writer_->length(); }
+
Result<ManifestFile> ManifestWriter::ToManifestFile() const {
if (!closed_) [[unlikely]] {
return Invalid("Cannot get ManifestFile before closing the writer.");
diff --git a/src/iceberg/manifest/manifest_writer.h
b/src/iceberg/manifest/manifest_writer.h
index a708e6e3..6f468240 100644
--- a/src/iceberg/manifest/manifest_writer.h
+++ b/src/iceberg/manifest/manifest_writer.h
@@ -41,7 +41,7 @@ class ICEBERG_EXPORT ManifestWriter {
/// \brief Write the entry that all its fields are populated correctly.
/// \param entry Manifest entry to write.
- /// \return Status::OK() if entry was written successfully
+ /// \return Status indicating success or failure
/// \note All other write entry variants delegate to this method after
populating
/// the necessary fields.
Status WriteEntry(const ManifestEntry& entry);
@@ -50,7 +50,7 @@ class ICEBERG_EXPORT ManifestWriter {
///
/// \param file an added data file
/// \param data_sequence_number a data sequence number for the file
- /// \return Status::OK() if the entry was written successfully
+ /// \return Status indicating success or failure
/// \note The entry's snapshot ID will be this manifest's snapshot ID. The
entry's data
/// sequence number will be the provided data sequence number. The entry's
file sequence
/// number will be assigned at commit.
@@ -67,7 +67,7 @@ class ICEBERG_EXPORT ManifestWriter {
/// file was added)
/// \param file_sequence_number a file sequence number (assigned when the
file was
/// added)
- /// \return Status::OK() if the entry was written successfully
+ /// \return Status indicating success or failure
/// \note The original data and file sequence numbers, snapshot ID, which
were assigned
/// at commit, must be preserved when adding an existing entry.
Status WriteExistingEntry(std::shared_ptr<DataFile> file, int64_t
file_snapshot_id,
@@ -83,7 +83,7 @@ class ICEBERG_EXPORT ManifestWriter {
/// file was added)
/// \param file_sequence_number a file sequence number (assigned when the
file was
/// added)
- /// \return Status::OK() if the entry was written successfully
+ /// \return Status indicating success or failure
/// \note The entry's snapshot ID will be this manifest's snapshot ID.
However, the
/// original data and file sequence numbers of the file must be preserved
when the file
/// is marked as deleted.
@@ -95,7 +95,7 @@ class ICEBERG_EXPORT ManifestWriter {
/// \brief Write manifest entries to file.
/// \param entries Already populated manifest entries to write.
- /// \return Status::OK() if all entries were written successfully
+ /// \return Status indicating success or failure
Status AddAll(const std::vector<ManifestEntry>& entries);
/// \brief Close writer and flush to storage.
@@ -108,6 +108,10 @@ class ICEBERG_EXPORT ManifestWriter {
/// \note Only valid after the file is closed.
Result<Metrics> metrics() const;
+ /// \brief Get the current length of the manifest file in bytes.
+ /// \return The current length of the file, or an error if the operation
fails.
+ Result<int64_t> length() const;
+
/// \brief Get the ManifestFile object.
/// \note Only valid after the file is closed.
Result<ManifestFile> ToManifestFile() const;
@@ -187,12 +191,12 @@ class ICEBERG_EXPORT ManifestListWriter {
/// \brief Write manifest file to manifest list file.
/// \param file Manifest file to write.
- /// \return Status::OK() if file was written successfully
+ /// \return Status indicating success or failure
Status Add(const ManifestFile& file);
/// \brief Write manifest file list to manifest list file.
/// \param files Manifest file list to write.
- /// \return Status::OK() if all files were written successfully
+ /// \return Status indicating success or failure
Status AddAll(const std::vector<ManifestFile>& files);
/// \brief Close writer and flush to storage.
diff --git a/src/iceberg/manifest/meson.build b/src/iceberg/manifest/meson.build
index f49c5a5f..00f93c59 100644
--- a/src/iceberg/manifest/meson.build
+++ b/src/iceberg/manifest/meson.build
@@ -21,6 +21,7 @@ install_headers(
'manifest_list.h',
'manifest_reader.h',
'manifest_writer.h',
+ 'rolling_manifest_writer.h',
],
subdir: 'iceberg/manifest',
)
diff --git a/src/iceberg/manifest/rolling_manifest_writer.cc
b/src/iceberg/manifest/rolling_manifest_writer.cc
new file mode 100644
index 00000000..1648ca86
--- /dev/null
+++ b/src/iceberg/manifest/rolling_manifest_writer.cc
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest/rolling_manifest_writer.h"
+
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/result.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+RollingManifestWriter::RollingManifestWriter(
+ ManifestWriterFactory manifest_writer_factory, int64_t
target_file_size_in_bytes)
+ : manifest_writer_factory_(std::move(manifest_writer_factory)),
+ target_file_size_in_bytes_(target_file_size_in_bytes) {}
+
+RollingManifestWriter::~RollingManifestWriter() {
+ // Ensure we close the current writer if not already closed
+ std::ignore = Close();
+}
+
+Status RollingManifestWriter::WriteAddedEntry(
+ std::shared_ptr<DataFile> file, std::optional<int64_t>
data_sequence_number) {
+ ICEBERG_ASSIGN_OR_RAISE(auto* writer, CurrentWriter());
+ ICEBERG_RETURN_UNEXPECTED(
+ writer->WriteAddedEntry(std::move(file), data_sequence_number));
+ current_file_rows_++;
+ return {};
+}
+
+Status RollingManifestWriter::WriteExistingEntry(
+ std::shared_ptr<DataFile> file, int64_t file_snapshot_id,
+ int64_t data_sequence_number, std::optional<int64_t> file_sequence_number)
{
+ ICEBERG_ASSIGN_OR_RAISE(auto* writer, CurrentWriter());
+ ICEBERG_RETURN_UNEXPECTED(writer->WriteExistingEntry(
+ std::move(file), file_snapshot_id, data_sequence_number,
file_sequence_number));
+ current_file_rows_++;
+ return {};
+}
+
+Status RollingManifestWriter::WriteDeletedEntry(
+ std::shared_ptr<DataFile> file, int64_t data_sequence_number,
+ std::optional<int64_t> file_sequence_number) {
+ ICEBERG_ASSIGN_OR_RAISE(auto* writer, CurrentWriter());
+ ICEBERG_RETURN_UNEXPECTED(writer->WriteDeletedEntry(
+ std::move(file), data_sequence_number, file_sequence_number));
+ current_file_rows_++;
+ return {};
+}
+
+Status RollingManifestWriter::Close() {
+ if (!closed_) {
+ ICEBERG_RETURN_UNEXPECTED(CloseCurrentWriter());
+ closed_ = true;
+ }
+ return {};
+}
+
+Result<std::vector<ManifestFile>> RollingManifestWriter::ToManifestFiles()
const {
+ if (!closed_) {
+ return Invalid("Cannot get ManifestFile list from unclosed writer");
+ }
+ return manifest_files_;
+}
+
+Result<ManifestWriter*> RollingManifestWriter::CurrentWriter() {
+ if (current_writer_ == nullptr) {
+ ICEBERG_ASSIGN_OR_RAISE(current_writer_, manifest_writer_factory_());
+ } else if (ShouldRollToNewFile()) {
+ ICEBERG_RETURN_UNEXPECTED(CloseCurrentWriter());
+ ICEBERG_ASSIGN_OR_RAISE(current_writer_, manifest_writer_factory_());
+ }
+
+ return current_writer_.get();
+}
+
+bool RollingManifestWriter::ShouldRollToNewFile() const {
+ if (current_writer_ == nullptr) {
+ return false;
+ }
+ // Roll when row count is a multiple of the divisor and file size >= target
+ if (current_file_rows_ % kRowsDivisor == 0) {
+ auto length_result = current_writer_->length();
+ if (length_result.has_value()) {
+ return length_result.value() >= target_file_size_in_bytes_;
+ }
+ // TODO(anyone): If we can't get the length, don't roll for now, revisit
this later.
+ }
+ return false;
+}
+
+Status RollingManifestWriter::CloseCurrentWriter() {
+ if (current_writer_ != nullptr) {
+ ICEBERG_RETURN_UNEXPECTED(current_writer_->Close());
+ ICEBERG_ASSIGN_OR_RAISE(auto manifest_file,
current_writer_->ToManifestFile());
+ manifest_files_.push_back(std::move(manifest_file));
+ current_writer_.reset();
+ current_file_rows_ = 0;
+ }
+ return {};
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/manifest/rolling_manifest_writer.h
b/src/iceberg/manifest/rolling_manifest_writer.h
new file mode 100644
index 00000000..6e0211f5
--- /dev/null
+++ b/src/iceberg/manifest/rolling_manifest_writer.h
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/manifest/rolling_manifest_writer.h
+/// Rolling manifest writer that can produce multiple manifest files.
+
+#include <functional>
+#include <memory>
+#include <vector>
+
+#include "iceberg/iceberg_export.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/result.h"
+
+namespace iceberg {
+
+/// \brief A rolling manifest writer that can produce multiple manifest files.
+class ICEBERG_EXPORT RollingManifestWriter {
+ public:
+ /// \brief Factory function type for creating ManifestWriter instances.
+ using ManifestWriterFactory =
std::function<Result<std::unique_ptr<ManifestWriter>>()>;
+
+ /// \brief Construct a rolling manifest writer.
+ /// \param manifest_writer_factory Factory function to create new
ManifestWriter
+ /// instances.
+ /// \param target_file_size_in_bytes Target file size in bytes. When the
current
+ /// file reaches this size (and row count is a multiple of 250), a new file
+ /// will be created.
+ RollingManifestWriter(ManifestWriterFactory manifest_writer_factory,
+ int64_t target_file_size_in_bytes);
+
+ ~RollingManifestWriter();
+
+ /// \brief Add an added entry for a file.
+ ///
+ /// \param file a data file
+ /// \return Status indicating success or failure
+ /// \note The entry's snapshot ID will be this manifest's snapshot ID. The
+ /// entry's data sequence number will be the provided data sequence number.
+ /// The entry's file sequence number will be assigned at commit.
+ Status WriteAddedEntry(std::shared_ptr<DataFile> file,
+ std::optional<int64_t> data_sequence_number =
std::nullopt);
+
+ /// \brief Add an existing entry for a file.
+ ///
+ /// \param file an existing data file
+ /// \param file_snapshot_id snapshot ID when the data file was added to the
table
+ /// \param data_sequence_number a data sequence number of the file (assigned
when
+ /// the file was added)
+ /// \param file_sequence_number a file sequence number (assigned when the
file
+ /// was added)
+ /// \return Status indicating success or failure
+ /// \note The original data and file sequence numbers, snapshot ID, which
were
+ /// assigned at commit, must be preserved when adding an existing entry.
+ Status WriteExistingEntry(std::shared_ptr<DataFile> file, int64_t
file_snapshot_id,
+ int64_t data_sequence_number,
+ std::optional<int64_t> file_sequence_number =
std::nullopt);
+
+ /// \brief Add a delete entry for a file.
+ ///
+ /// \param file a deleted data file
+ /// \param data_sequence_number a data sequence number of the file (assigned
when
+ /// the file was added)
+ /// \param file_sequence_number a file sequence number (assigned when the
file
+ /// was added)
+ /// \return Status indicating success or failure
+ /// \note The entry's snapshot ID will be this manifest's snapshot ID.
However,
+ /// the original data and file sequence numbers of the file must be preserved
+ /// when the file is marked as deleted.
+ Status WriteDeletedEntry(std::shared_ptr<DataFile> file, int64_t
data_sequence_number,
+ std::optional<int64_t> file_sequence_number =
std::nullopt);
+
+ /// \brief Close the rolling manifest writer.
+ Status Close();
+
+ /// \brief Get the list of manifest files produced by this writer.
+ /// \return A vector of ManifestFile objects
+ /// \note Only valid after the writer is closed.
+ Result<std::vector<ManifestFile>> ToManifestFiles() const;
+
+ private:
+ /// \brief Get or create the current writer, rolling to a new file if needed.
+ /// \return The current ManifestWriter, or an error if creation fails
+ Result<ManifestWriter*> CurrentWriter();
+
+ /// \brief Check if we should roll to a new file.
+ ///
+ /// This method checks if the current file has reached the target size
+ /// or the number of rows has reached the threshold. If so, it rolls to a
new file.
+ bool ShouldRollToNewFile() const;
+
+ /// \brief Close the current writer and add its ManifestFile to the list.
+ Status CloseCurrentWriter();
+
+ /// \brief The number of rows after which to consider rolling to a new file.
+ /// \note This aligned with Iceberg's Java impl.
+ static constexpr int64_t kRowsDivisor = 250;
+
+ ManifestWriterFactory manifest_writer_factory_;
+ int64_t target_file_size_in_bytes_;
+ std::vector<ManifestFile> manifest_files_;
+
+ int64_t current_file_rows_{0};
+ std::unique_ptr<ManifestWriter> current_writer_{nullptr};
+ bool closed_{false};
+};
+
+} // namespace iceberg
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index 77d72638..3929e180 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -66,6 +66,7 @@ iceberg_sources = files(
'manifest/manifest_list.cc',
'manifest/manifest_reader.cc',
'manifest/manifest_writer.cc',
+ 'manifest/rolling_manifest_writer.cc',
'manifest/v1_metadata.cc',
'manifest/v2_metadata.cc',
'manifest/v3_metadata.cc',
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index b7f23c53..30a473fd 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -146,7 +146,8 @@ if(ICEBERG_BUILD_BUNDLE)
manifest_list_versions_test.cc
manifest_reader_stats_test.cc
manifest_reader_test.cc
- manifest_writer_versions_test.cc)
+ manifest_writer_versions_test.cc
+ rolling_manifest_writer_test.cc)
add_iceberg_test(parquet_test
USE_BUNDLE
diff --git a/src/iceberg/test/rolling_manifest_writer_test.cc
b/src/iceberg/test/rolling_manifest_writer_test.cc
new file mode 100644
index 00000000..8ea13869
--- /dev/null
+++ b/src/iceberg/test/rolling_manifest_writer_test.cc
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest/rolling_manifest_writer.h"
+
+#include <chrono>
+#include <cstdint>
+#include <memory>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/avro/avro_register.h"
+#include "iceberg/file_format.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/row/partition_values.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_field.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/transform.h"
+#include "iceberg/type.h"
+
+namespace iceberg {
+
+namespace {
+
+constexpr int64_t kSequenceNumber = 34L;
+constexpr int64_t kSnapshotId = 987134631982734L;
+constexpr std::string_view kPath =
+
"s3://bucket/table/category=cheesy/timestamp_hour=10/id_bucket=3/file.avro";
+constexpr FileFormatType kFormat = FileFormatType::kAvro;
+constexpr int64_t kFirstRowId = 100L;
+constexpr int64_t kFileSizeCheckRowsDivisor = 250;
+constexpr int64_t kSmallFileSize = 10L;
+
+const PartitionValues kPartition =
+ PartitionValues({Literal::String("cheesy"), Literal::Int(10),
Literal::Int(3)});
+
+std::shared_ptr<DataFile> CreateDataFile(int64_t record_count) {
+ auto data_file = std::make_shared<DataFile>();
+ data_file->file_path = std::format("data_bucket=0/file-{}.parquet",
record_count);
+ data_file->file_format = FileFormatType::kParquet;
+ data_file->partition = kPartition;
+ data_file->file_size_in_bytes = 1024;
+ data_file->record_count = record_count;
+ return data_file;
+}
+
+std::shared_ptr<DataFile> CreateDeleteFile(int64_t record_count) {
+ auto delete_file = std::make_shared<DataFile>();
+ delete_file->content = DataFile::Content::kPositionDeletes;
+ delete_file->file_path = std::format("/path/to/delete-{}.parquet",
record_count);
+ delete_file->file_format = FileFormatType::kParquet;
+ delete_file->partition = kPartition;
+ delete_file->file_size_in_bytes = 10;
+ delete_file->record_count = record_count;
+ return delete_file;
+}
+
+static std::string CreateManifestPath() {
+ return std::format("manifest-{}.avro",
+
std::chrono::system_clock::now().time_since_epoch().count());
+}
+
+} // namespace
+
+class RollingManifestWriterTest : public ::testing::TestWithParam<int32_t> {
+ protected:
+ void SetUp() override {
+ avro::RegisterAll();
+ schema_ = std::make_shared<Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "id", int64()),
+ SchemaField::MakeRequired(2, "timestamp", timestamp_tz()),
+ SchemaField::MakeRequired(3, "category", string()),
+ SchemaField::MakeRequired(4, "data", string()),
+ SchemaField::MakeRequired(5, "double", float64())});
+ spec_ = PartitionSpec::Make(
+ 0, {PartitionField(3, 1000, "category", Transform::Identity()),
+ PartitionField(2, 1001, "timestamp_hour",
Transform::Hour()),
+ PartitionField(1, 1002, "id_bucket",
Transform::Bucket(16))})
+ .value();
+
+ file_io_ = iceberg::arrow::MakeMockFileIO();
+ }
+
+ RollingManifestWriter::ManifestWriterFactory NewRollingWriteManifestFactory(
+ int32_t format_version) {
+ return [this, format_version]() -> Result<std::unique_ptr<ManifestWriter>>
{
+ const std::string manifest_path = CreateManifestPath();
+ Result<std::unique_ptr<ManifestWriter>> writer_result =
+ NotSupported("Format version: {}", format_version);
+
+ if (format_version == 1) {
+ writer_result = ManifestWriter::MakeV1Writer(kSnapshotId,
manifest_path, file_io_,
+ spec_, schema_);
+ } else if (format_version == 2) {
+ writer_result = ManifestWriter::MakeV2Writer(
+ kSnapshotId, manifest_path, file_io_, spec_, schema_,
ManifestContent::kData);
+ } else if (format_version == 3) {
+ writer_result = ManifestWriter::MakeV3Writer(kSnapshotId, kFirstRowId,
+ manifest_path, file_io_,
spec_,
+ schema_,
ManifestContent::kData);
+ }
+
+ return writer_result;
+ };
+ }
+
+ RollingManifestWriter::ManifestWriterFactory
NewRollingWriteDeleteManifestFactory(
+ int32_t format_version) {
+ return [this, format_version]() -> Result<std::unique_ptr<ManifestWriter>>
{
+ const std::string manifest_path = CreateManifestPath();
+ Result<std::unique_ptr<ManifestWriter>> writer_result =
+ NotSupported("Format version: {}", format_version);
+
+ if (format_version == 2) {
+ writer_result =
+ ManifestWriter::MakeV2Writer(kSnapshotId, manifest_path, file_io_,
spec_,
+ schema_, ManifestContent::kDeletes);
+ } else if (format_version == 3) {
+ writer_result = ManifestWriter::MakeV3Writer(kSnapshotId, kFirstRowId,
+ manifest_path, file_io_,
spec_,
+ schema_,
ManifestContent::kDeletes);
+ }
+
+ return writer_result;
+ };
+ }
+
+ void CheckManifests(const std::vector<ManifestFile>& manifests,
+ const std::vector<int32_t>& added_file_counts,
+ const std::vector<int32_t>& existing_file_counts,
+ const std::vector<int32_t>& deleted_file_counts,
+ const std::vector<int64_t>& added_row_counts,
+ const std::vector<int64_t>& existing_row_counts,
+ const std::vector<int64_t>& deleted_row_counts) {
+ ASSERT_EQ(manifests.size(), added_file_counts.size());
+ for (size_t i = 0; i < manifests.size(); i++) {
+ const ManifestFile& manifest = manifests[i];
+ EXPECT_TRUE(manifest.has_added_files());
+ EXPECT_EQ(manifest.added_files_count.value_or(0), added_file_counts[i]);
+ EXPECT_EQ(manifest.added_rows_count.value_or(0), added_row_counts[i]);
+
+ EXPECT_TRUE(manifest.has_existing_files());
+ EXPECT_EQ(manifest.existing_files_count.value_or(0),
existing_file_counts[i]);
+ EXPECT_EQ(manifest.existing_rows_count.value_or(0),
existing_row_counts[i]);
+
+ EXPECT_TRUE(manifest.has_deleted_files());
+ EXPECT_EQ(manifest.deleted_files_count.value_or(0),
deleted_file_counts[i]);
+ EXPECT_EQ(manifest.deleted_rows_count.value_or(0),
deleted_row_counts[i]);
+ }
+ }
+
+ std::shared_ptr<Schema> schema_;
+ std::shared_ptr<PartitionSpec> spec_;
+ std::shared_ptr<FileIO> file_io_;
+};
+
+TEST_P(RollingManifestWriterTest, TestRollingManifestWriterNoRecords) {
+ int32_t format_version = GetParam();
+ RollingManifestWriter writer(NewRollingWriteManifestFactory(format_version),
+ kSmallFileSize);
+
+ EXPECT_THAT(writer.Close(), IsOk());
+ ICEBERG_UNWRAP_OR_FAIL(auto manifest_files, writer.ToManifestFiles());
+ EXPECT_TRUE(manifest_files.empty());
+
+ // Test that calling close again doesn't change the result
+ EXPECT_THAT(writer.Close(), IsOk());
+ ICEBERG_UNWRAP_OR_FAIL(manifest_files, writer.ToManifestFiles());
+ EXPECT_TRUE(manifest_files.empty());
+}
+
+TEST_P(RollingManifestWriterTest, TestRollingDeleteManifestWriterNoRecords) {
+ int32_t format_version = GetParam();
+ if (format_version < 2) {
+ GTEST_SKIP() << "Delete manifests only supported in V2+";
+ }
+ RollingManifestWriter
writer(NewRollingWriteDeleteManifestFactory(format_version),
+ kSmallFileSize);
+
+ EXPECT_THAT(writer.Close(), IsOk());
+ ICEBERG_UNWRAP_OR_FAIL(auto manifest_files, writer.ToManifestFiles());
+ EXPECT_TRUE(manifest_files.empty());
+
+ // Test that calling close again doesn't change the result
+ EXPECT_THAT(writer.Close(), IsOk());
+ ICEBERG_UNWRAP_OR_FAIL(manifest_files, writer.ToManifestFiles());
+ EXPECT_TRUE(manifest_files.empty());
+}
+
+TEST_P(RollingManifestWriterTest, TestRollingManifestWriterSplitFiles) {
+ int32_t format_version = GetParam();
+ RollingManifestWriter writer(NewRollingWriteManifestFactory(format_version),
+ kSmallFileSize);
+
+ std::vector<int32_t> added_file_counts(3, 0);
+ std::vector<int32_t> existing_file_counts(3, 0);
+ std::vector<int32_t> deleted_file_counts(3, 0);
+ std::vector<int64_t> added_row_counts(3, 0);
+ std::vector<int64_t> existing_row_counts(3, 0);
+ std::vector<int64_t> deleted_row_counts(3, 0);
+
+ // Write 750 entries (3 * 250) to trigger 3 file splits
+ for (int32_t i = 0; i < kFileSizeCheckRowsDivisor * 3; i++) {
+ int32_t type = i % 3;
+ int32_t file_index = i / kFileSizeCheckRowsDivisor;
+ auto data_file = CreateDataFile(i);
+
+ if (type == 0) {
+ EXPECT_THAT(writer.WriteAddedEntry(data_file), IsOk());
+ added_file_counts[file_index] += 1;
+ added_row_counts[file_index] += i;
+ } else if (type == 1) {
+ EXPECT_THAT(writer.WriteExistingEntry(data_file, 1, 1, std::nullopt),
IsOk());
+ existing_file_counts[file_index] += 1;
+ existing_row_counts[file_index] += i;
+ } else {
+ EXPECT_THAT(writer.WriteDeletedEntry(data_file, 1, std::nullopt),
IsOk());
+ deleted_file_counts[file_index] += 1;
+ deleted_row_counts[file_index] += i;
+ }
+ }
+
+ EXPECT_THAT(writer.Close(), IsOk());
+ ICEBERG_UNWRAP_OR_FAIL(auto manifest_files, writer.ToManifestFiles());
+ EXPECT_EQ(manifest_files.size(), 3);
+
+ CheckManifests(manifest_files, added_file_counts, existing_file_counts,
+ deleted_file_counts, added_row_counts, existing_row_counts,
+ deleted_row_counts);
+
+ // Test that calling close again doesn't change the result
+ EXPECT_THAT(writer.Close(), IsOk());
+ ICEBERG_UNWRAP_OR_FAIL(manifest_files, writer.ToManifestFiles());
+ EXPECT_EQ(manifest_files.size(), 3);
+
+ CheckManifests(manifest_files, added_file_counts, existing_file_counts,
+ deleted_file_counts, added_row_counts, existing_row_counts,
+ deleted_row_counts);
+}
+
+TEST_P(RollingManifestWriterTest, TestRollingDeleteManifestWriterSplitFiles) {
+ int32_t format_version = GetParam();
+ if (format_version < 2) {
+ GTEST_SKIP() << "Delete manifests only supported in V2+";
+ }
+ RollingManifestWriter
writer(NewRollingWriteDeleteManifestFactory(format_version),
+ kSmallFileSize);
+
+ std::vector<int32_t> added_file_counts(3, 0);
+ std::vector<int32_t> existing_file_counts(3, 0);
+ std::vector<int32_t> deleted_file_counts(3, 0);
+ std::vector<int64_t> added_row_counts(3, 0);
+ std::vector<int64_t> existing_row_counts(3, 0);
+ std::vector<int64_t> deleted_row_counts(3, 0);
+
+ // Write 750 entries (3 * 250) to trigger 3 file splits
+ for (int32_t i = 0; i < 3 * kFileSizeCheckRowsDivisor; i++) {
+ int32_t type = i % 3;
+ int32_t file_index = i / kFileSizeCheckRowsDivisor;
+ auto delete_file = CreateDeleteFile(i);
+
+ if (type == 0) {
+ EXPECT_THAT(writer.WriteAddedEntry(delete_file), IsOk());
+ added_file_counts[file_index] += 1;
+ added_row_counts[file_index] += i;
+ } else if (type == 1) {
+ EXPECT_THAT(writer.WriteExistingEntry(delete_file, 1, 1, std::nullopt),
IsOk());
+ existing_file_counts[file_index] += 1;
+ existing_row_counts[file_index] += i;
+ } else {
+ EXPECT_THAT(writer.WriteDeletedEntry(delete_file, 1, std::nullopt),
IsOk());
+ deleted_file_counts[file_index] += 1;
+ deleted_row_counts[file_index] += i;
+ }
+ }
+
+ EXPECT_THAT(writer.Close(), IsOk());
+ ICEBERG_UNWRAP_OR_FAIL(auto manifest_files, writer.ToManifestFiles());
+ EXPECT_EQ(manifest_files.size(), 3);
+
+ CheckManifests(manifest_files, added_file_counts, existing_file_counts,
+ deleted_file_counts, added_row_counts, existing_row_counts,
+ deleted_row_counts);
+
+ // Test that calling close again doesn't change the result
+ EXPECT_THAT(writer.Close(), IsOk());
+ ICEBERG_UNWRAP_OR_FAIL(manifest_files, writer.ToManifestFiles());
+ EXPECT_EQ(manifest_files.size(), 3);
+
+ CheckManifests(manifest_files, added_file_counts, existing_file_counts,
+ deleted_file_counts, added_row_counts, existing_row_counts,
+ deleted_row_counts);
+}
+
+INSTANTIATE_TEST_SUITE_P(TestRollingManifestWriter, RollingManifestWriterTest,
+ ::testing::Values(1, 2, 3));
+
+} // namespace iceberg
diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc
index 33864ca9..c8446e8b 100644
--- a/src/iceberg/transaction.cc
+++ b/src/iceberg/transaction.cc
@@ -107,7 +107,7 @@ Status Transaction::Apply(PendingUpdate& update) {
} break;
default:
return NotSupported("Unsupported pending update: {}",
- static_cast<int>(update.kind()));
+ static_cast<int32_t>(update.kind()));
}
last_update_committed_ = true;
diff --git a/src/iceberg/type.cc b/src/iceberg/type.cc
index 99b3433b..ed10e127 100644
--- a/src/iceberg/type.cc
+++ b/src/iceberg/type.cc
@@ -19,6 +19,7 @@
#include "iceberg/type.h"
+#include <cstdint>
#include <format>
#include <iterator>
#include <memory>
@@ -162,7 +163,7 @@ Result<std::optional<NestedType::SchemaFieldConstRef>>
ListType::GetFieldById(
}
Result<std::optional<NestedType::SchemaFieldConstRef>>
ListType::GetFieldByIndex(
- int index) const {
+ int32_t index) const {
if (index == 0) {
return std::cref(element_);
}