This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new bbdb227e feat: add rolling manifest writer (#443)
bbdb227e is described below

commit bbdb227eeb9daf9a0d7f9ee0f5d70001ed8164e5
Author: Junwang Zhao <[email protected]>
AuthorDate: Tue Dec 30 13:44:57 2025 +0800

    feat: add rolling manifest writer (#443)
---
 src/iceberg/CMakeLists.txt                       |   1 +
 src/iceberg/file_writer.h                        |   2 +-
 src/iceberg/manifest/manifest_entry.h            |   4 +-
 src/iceberg/manifest/manifest_writer.cc          |   2 +
 src/iceberg/manifest/manifest_writer.h           |  18 +-
 src/iceberg/manifest/meson.build                 |   1 +
 src/iceberg/manifest/rolling_manifest_writer.cc  | 119 +++++++++
 src/iceberg/manifest/rolling_manifest_writer.h   | 128 +++++++++
 src/iceberg/meson.build                          |   1 +
 src/iceberg/test/CMakeLists.txt                  |   3 +-
 src/iceberg/test/rolling_manifest_writer_test.cc | 322 +++++++++++++++++++++++
 src/iceberg/transaction.cc                       |   2 +-
 src/iceberg/type.cc                              |   3 +-
 13 files changed, 593 insertions(+), 13 deletions(-)

diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index ce339cfe..36c3a483 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -44,6 +44,7 @@ set(ICEBERG_SOURCES
     manifest/manifest_list.cc
     manifest/manifest_reader.cc
     manifest/manifest_writer.cc
+    manifest/rolling_manifest_writer.cc
     manifest/v1_metadata.cc
     manifest/v2_metadata.cc
     manifest/v3_metadata.cc
diff --git a/src/iceberg/file_writer.h b/src/iceberg/file_writer.h
index f3540dd7..72ab0da1 100644
--- a/src/iceberg/file_writer.h
+++ b/src/iceberg/file_writer.h
@@ -103,7 +103,7 @@ class ICEBERG_EXPORT Writer {
   virtual Result<Metrics> metrics() = 0;
 
   /// \brief Get the file length.
-  /// Only valid after the file is closed.
+  /// This can be called while the writer is still open or after the file is 
closed.
   virtual Result<int64_t> length() = 0;
 
   /// \brief Returns a list of recommended split locations, if applicable, 
empty
diff --git a/src/iceberg/manifest/manifest_entry.h 
b/src/iceberg/manifest/manifest_entry.h
index f82b7a2b..5d35530b 100644
--- a/src/iceberg/manifest/manifest_entry.h
+++ b/src/iceberg/manifest/manifest_entry.h
@@ -46,7 +46,7 @@ enum class ManifestStatus {
 
 /// \brief Get the relative manifest status type from int
 ICEBERG_EXPORT constexpr Result<ManifestStatus> ManifestStatusFromInt(
-    int status) noexcept {
+    int32_t status) noexcept {
   switch (status) {
     case 0:
       return ManifestStatus::kExisting;
@@ -387,7 +387,7 @@ ICEBERG_EXPORT constexpr std::string_view 
ToString(DataFile::Content type) noexc
 
 /// \brief Get the relative data file content type from int
 ICEBERG_EXPORT constexpr Result<DataFile::Content> DataFileContentFromInt(
-    int content) noexcept {
+    int32_t content) noexcept {
   switch (content) {
     case 0:
       return DataFile::Content::kData;
diff --git a/src/iceberg/manifest/manifest_writer.cc 
b/src/iceberg/manifest/manifest_writer.cc
index 8cd940d5..e3d2564a 100644
--- a/src/iceberg/manifest/manifest_writer.cc
+++ b/src/iceberg/manifest/manifest_writer.cc
@@ -217,6 +217,8 @@ ManifestContent ManifestWriter::content() const { return 
adapter_->content(); }
 
 Result<Metrics> ManifestWriter::metrics() const { return writer_->metrics(); }
 
+Result<int64_t> ManifestWriter::length() const { return writer_->length(); }
+
 Result<ManifestFile> ManifestWriter::ToManifestFile() const {
   if (!closed_) [[unlikely]] {
     return Invalid("Cannot get ManifestFile before closing the writer.");
diff --git a/src/iceberg/manifest/manifest_writer.h 
b/src/iceberg/manifest/manifest_writer.h
index a708e6e3..6f468240 100644
--- a/src/iceberg/manifest/manifest_writer.h
+++ b/src/iceberg/manifest/manifest_writer.h
@@ -41,7 +41,7 @@ class ICEBERG_EXPORT ManifestWriter {
 
   /// \brief Write the entry that all its fields are populated correctly.
   /// \param entry Manifest entry to write.
-  /// \return Status::OK() if entry was written successfully
+  /// \return Status indicating success or failure
   /// \note All other write entry variants delegate to this method after 
populating
   /// the necessary fields.
   Status WriteEntry(const ManifestEntry& entry);
@@ -50,7 +50,7 @@ class ICEBERG_EXPORT ManifestWriter {
   ///
   /// \param file an added data file
   /// \param data_sequence_number a data sequence number for the file
-  /// \return Status::OK() if the entry was written successfully
+  /// \return Status indicating success or failure
   /// \note The entry's snapshot ID will be this manifest's snapshot ID. The 
entry's data
   /// sequence number will be the provided data sequence number. The entry's 
file sequence
   /// number will be assigned at commit.
@@ -67,7 +67,7 @@ class ICEBERG_EXPORT ManifestWriter {
   /// file was added)
   /// \param file_sequence_number a file sequence number (assigned when the 
file was
   /// added)
-  /// \return Status::OK() if the entry was written successfully
+  /// \return Status indicating success or failure
   /// \note The original data and file sequence numbers, snapshot ID, which 
were assigned
   /// at commit, must be preserved when adding an existing entry.
   Status WriteExistingEntry(std::shared_ptr<DataFile> file, int64_t 
file_snapshot_id,
@@ -83,7 +83,7 @@ class ICEBERG_EXPORT ManifestWriter {
   /// file was added)
   /// \param file_sequence_number a file sequence number (assigned when the 
file was
   /// added)
-  /// \return Status::OK() if the entry was written successfully
+  /// \return Status indicating success or failure
   /// \note The entry's snapshot ID will be this manifest's snapshot ID. 
However, the
   /// original data and file sequence numbers of the file must be preserved 
when the file
   /// is marked as deleted.
@@ -95,7 +95,7 @@ class ICEBERG_EXPORT ManifestWriter {
 
   /// \brief Write manifest entries to file.
   /// \param entries Already populated manifest entries to write.
-  /// \return Status::OK() if all entries were written successfully
+  /// \return Status indicating success or failure
   Status AddAll(const std::vector<ManifestEntry>& entries);
 
   /// \brief Close writer and flush to storage.
@@ -108,6 +108,10 @@ class ICEBERG_EXPORT ManifestWriter {
   /// \note Only valid after the file is closed.
   Result<Metrics> metrics() const;
 
+  /// \brief Get the current length of the manifest file in bytes.
+  /// \return The current length of the file, or an error if the operation 
fails.
+  Result<int64_t> length() const;
+
   /// \brief Get the ManifestFile object.
   /// \note Only valid after the file is closed.
   Result<ManifestFile> ToManifestFile() const;
@@ -187,12 +191,12 @@ class ICEBERG_EXPORT ManifestListWriter {
 
   /// \brief Write manifest file to manifest list file.
   /// \param file Manifest file to write.
-  /// \return Status::OK() if file was written successfully
+  /// \return Status indicating success or failure
   Status Add(const ManifestFile& file);
 
   /// \brief Write manifest file list to manifest list file.
   /// \param files Manifest file list to write.
-  /// \return Status::OK() if all files were written successfully
+  /// \return Status indicating success or failure
   Status AddAll(const std::vector<ManifestFile>& files);
 
   /// \brief Close writer and flush to storage.
diff --git a/src/iceberg/manifest/meson.build b/src/iceberg/manifest/meson.build
index f49c5a5f..00f93c59 100644
--- a/src/iceberg/manifest/meson.build
+++ b/src/iceberg/manifest/meson.build
@@ -21,6 +21,7 @@ install_headers(
         'manifest_list.h',
         'manifest_reader.h',
         'manifest_writer.h',
+        'rolling_manifest_writer.h',
     ],
     subdir: 'iceberg/manifest',
 )
diff --git a/src/iceberg/manifest/rolling_manifest_writer.cc 
b/src/iceberg/manifest/rolling_manifest_writer.cc
new file mode 100644
index 00000000..1648ca86
--- /dev/null
+++ b/src/iceberg/manifest/rolling_manifest_writer.cc
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest/rolling_manifest_writer.h"
+
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/result.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+RollingManifestWriter::RollingManifestWriter(
+    ManifestWriterFactory manifest_writer_factory, int64_t 
target_file_size_in_bytes)
+    : manifest_writer_factory_(std::move(manifest_writer_factory)),
+      target_file_size_in_bytes_(target_file_size_in_bytes) {}
+
+RollingManifestWriter::~RollingManifestWriter() {
+  // Ensure we close the current writer if not already closed
+  std::ignore = Close();
+}
+
+Status RollingManifestWriter::WriteAddedEntry(
+    std::shared_ptr<DataFile> file, std::optional<int64_t> 
data_sequence_number) {
+  ICEBERG_ASSIGN_OR_RAISE(auto* writer, CurrentWriter());
+  ICEBERG_RETURN_UNEXPECTED(
+      writer->WriteAddedEntry(std::move(file), data_sequence_number));
+  current_file_rows_++;
+  return {};
+}
+
+Status RollingManifestWriter::WriteExistingEntry(
+    std::shared_ptr<DataFile> file, int64_t file_snapshot_id,
+    int64_t data_sequence_number, std::optional<int64_t> file_sequence_number) 
{
+  ICEBERG_ASSIGN_OR_RAISE(auto* writer, CurrentWriter());
+  ICEBERG_RETURN_UNEXPECTED(writer->WriteExistingEntry(
+      std::move(file), file_snapshot_id, data_sequence_number, 
file_sequence_number));
+  current_file_rows_++;
+  return {};
+}
+
+Status RollingManifestWriter::WriteDeletedEntry(
+    std::shared_ptr<DataFile> file, int64_t data_sequence_number,
+    std::optional<int64_t> file_sequence_number) {
+  ICEBERG_ASSIGN_OR_RAISE(auto* writer, CurrentWriter());
+  ICEBERG_RETURN_UNEXPECTED(writer->WriteDeletedEntry(
+      std::move(file), data_sequence_number, file_sequence_number));
+  current_file_rows_++;
+  return {};
+}
+
+Status RollingManifestWriter::Close() {
+  if (!closed_) {
+    ICEBERG_RETURN_UNEXPECTED(CloseCurrentWriter());
+    closed_ = true;
+  }
+  return {};
+}
+
+Result<std::vector<ManifestFile>> RollingManifestWriter::ToManifestFiles() 
const {
+  if (!closed_) {
+    return Invalid("Cannot get ManifestFile list from unclosed writer");
+  }
+  return manifest_files_;
+}
+
+Result<ManifestWriter*> RollingManifestWriter::CurrentWriter() {
+  if (current_writer_ == nullptr) {
+    ICEBERG_ASSIGN_OR_RAISE(current_writer_, manifest_writer_factory_());
+  } else if (ShouldRollToNewFile()) {
+    ICEBERG_RETURN_UNEXPECTED(CloseCurrentWriter());
+    ICEBERG_ASSIGN_OR_RAISE(current_writer_, manifest_writer_factory_());
+  }
+
+  return current_writer_.get();
+}
+
+bool RollingManifestWriter::ShouldRollToNewFile() const {
+  if (current_writer_ == nullptr) {
+    return false;
+  }
+  // Roll when row count is a multiple of the divisor and file size >= target
+  if (current_file_rows_ % kRowsDivisor == 0) {
+    auto length_result = current_writer_->length();
+    if (length_result.has_value()) {
+      return length_result.value() >= target_file_size_in_bytes_;
+    }
+    // TODO(anyone): If we can't get the length, don't roll for now, revisit 
this later.
+  }
+  return false;
+}
+
+Status RollingManifestWriter::CloseCurrentWriter() {
+  if (current_writer_ != nullptr) {
+    ICEBERG_RETURN_UNEXPECTED(current_writer_->Close());
+    ICEBERG_ASSIGN_OR_RAISE(auto manifest_file, 
current_writer_->ToManifestFile());
+    manifest_files_.push_back(std::move(manifest_file));
+    current_writer_.reset();
+    current_file_rows_ = 0;
+  }
+  return {};
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/manifest/rolling_manifest_writer.h 
b/src/iceberg/manifest/rolling_manifest_writer.h
new file mode 100644
index 00000000..6e0211f5
--- /dev/null
+++ b/src/iceberg/manifest/rolling_manifest_writer.h
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/manifest/rolling_manifest_writer.h
+/// Rolling manifest writer that can produce multiple manifest files.
+
+#include <functional>
+#include <memory>
+#include <vector>
+
+#include "iceberg/iceberg_export.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/result.h"
+
+namespace iceberg {
+
+/// \brief A rolling manifest writer that can produce multiple manifest files.
+class ICEBERG_EXPORT RollingManifestWriter {
+ public:
+  /// \brief Factory function type for creating ManifestWriter instances.
+  using ManifestWriterFactory = 
std::function<Result<std::unique_ptr<ManifestWriter>>()>;
+
+  /// \brief Construct a rolling manifest writer.
+  /// \param manifest_writer_factory Factory function to create new 
ManifestWriter
+  /// instances.
+  /// \param target_file_size_in_bytes Target file size in bytes. When the 
current
+  /// file reaches this size (and row count is a multiple of 250), a new file
+  /// will be created.
+  RollingManifestWriter(ManifestWriterFactory manifest_writer_factory,
+                        int64_t target_file_size_in_bytes);
+
+  ~RollingManifestWriter();
+
+  /// \brief Add an added entry for a file.
+  ///
+  /// \param file a data file
+  /// \return Status indicating success or failure
+  /// \note The entry's snapshot ID will be this manifest's snapshot ID. The
+  /// entry's data sequence number will be the provided data sequence number.
+  /// The entry's file sequence number will be assigned at commit.
+  Status WriteAddedEntry(std::shared_ptr<DataFile> file,
+                         std::optional<int64_t> data_sequence_number = 
std::nullopt);
+
+  /// \brief Add an existing entry for a file.
+  ///
+  /// \param file an existing data file
+  /// \param file_snapshot_id snapshot ID when the data file was added to the 
table
+  /// \param data_sequence_number a data sequence number of the file (assigned 
when
+  /// the file was added)
+  /// \param file_sequence_number a file sequence number (assigned when the 
file
+  /// was added)
+  /// \return Status indicating success or failure
+  /// \note The original data and file sequence numbers, snapshot ID, which 
were
+  /// assigned at commit, must be preserved when adding an existing entry.
+  Status WriteExistingEntry(std::shared_ptr<DataFile> file, int64_t 
file_snapshot_id,
+                            int64_t data_sequence_number,
+                            std::optional<int64_t> file_sequence_number = 
std::nullopt);
+
+  /// \brief Add a delete entry for a file.
+  ///
+  /// \param file a deleted data file
+  /// \param data_sequence_number a data sequence number of the file (assigned 
when
+  /// the file was added)
+  /// \param file_sequence_number a file sequence number (assigned when the 
file
+  /// was added)
+  /// \return Status indicating success or failure
+  /// \note The entry's snapshot ID will be this manifest's snapshot ID. 
However,
+  /// the original data and file sequence numbers of the file must be preserved
+  /// when the file is marked as deleted.
+  Status WriteDeletedEntry(std::shared_ptr<DataFile> file, int64_t 
data_sequence_number,
+                           std::optional<int64_t> file_sequence_number = 
std::nullopt);
+
+  /// \brief Close the rolling manifest writer.
+  Status Close();
+
+  /// \brief Get the list of manifest files produced by this writer.
+  /// \return A vector of ManifestFile objects
+  /// \note Only valid after the writer is closed.
+  Result<std::vector<ManifestFile>> ToManifestFiles() const;
+
+ private:
+  /// \brief Get or create the current writer, rolling to a new file if needed.
+  /// \return The current ManifestWriter, or an error if creation fails
+  Result<ManifestWriter*> CurrentWriter();
+
+  /// \brief Check if we should roll to a new file.
+  ///
+  /// This method checks if the current file has reached the target size
+  /// or the number of rows has reached the threshold. If so, it rolls to a 
new file.
+  bool ShouldRollToNewFile() const;
+
+  /// \brief Close the current writer and add its ManifestFile to the list.
+  Status CloseCurrentWriter();
+
+  /// \brief The number of rows after which to consider rolling to a new file.
+  /// \note This aligned with Iceberg's Java impl.
+  static constexpr int64_t kRowsDivisor = 250;
+
+  ManifestWriterFactory manifest_writer_factory_;
+  int64_t target_file_size_in_bytes_;
+  std::vector<ManifestFile> manifest_files_;
+
+  int64_t current_file_rows_{0};
+  std::unique_ptr<ManifestWriter> current_writer_{nullptr};
+  bool closed_{false};
+};
+
+}  // namespace iceberg
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index 77d72638..3929e180 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -66,6 +66,7 @@ iceberg_sources = files(
     'manifest/manifest_list.cc',
     'manifest/manifest_reader.cc',
     'manifest/manifest_writer.cc',
+    'manifest/rolling_manifest_writer.cc',
     'manifest/v1_metadata.cc',
     'manifest/v2_metadata.cc',
     'manifest/v3_metadata.cc',
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index b7f23c53..30a473fd 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -146,7 +146,8 @@ if(ICEBERG_BUILD_BUNDLE)
                    manifest_list_versions_test.cc
                    manifest_reader_stats_test.cc
                    manifest_reader_test.cc
-                   manifest_writer_versions_test.cc)
+                   manifest_writer_versions_test.cc
+                   rolling_manifest_writer_test.cc)
 
   add_iceberg_test(parquet_test
                    USE_BUNDLE
diff --git a/src/iceberg/test/rolling_manifest_writer_test.cc 
b/src/iceberg/test/rolling_manifest_writer_test.cc
new file mode 100644
index 00000000..8ea13869
--- /dev/null
+++ b/src/iceberg/test/rolling_manifest_writer_test.cc
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest/rolling_manifest_writer.h"
+
+#include <chrono>
+#include <cstdint>
+#include <memory>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/avro/avro_register.h"
+#include "iceberg/file_format.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/row/partition_values.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_field.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/transform.h"
+#include "iceberg/type.h"
+
+namespace iceberg {
+
+namespace {
+
+constexpr int64_t kSequenceNumber = 34L;
+constexpr int64_t kSnapshotId = 987134631982734L;
+constexpr std::string_view kPath =
+    
"s3://bucket/table/category=cheesy/timestamp_hour=10/id_bucket=3/file.avro";
+constexpr FileFormatType kFormat = FileFormatType::kAvro;
+constexpr int64_t kFirstRowId = 100L;
+constexpr int64_t kFileSizeCheckRowsDivisor = 250;
+constexpr int64_t kSmallFileSize = 10L;
+
+const PartitionValues kPartition =
+    PartitionValues({Literal::String("cheesy"), Literal::Int(10), 
Literal::Int(3)});
+
+std::shared_ptr<DataFile> CreateDataFile(int64_t record_count) {
+  auto data_file = std::make_shared<DataFile>();
+  data_file->file_path = std::format("data_bucket=0/file-{}.parquet", 
record_count);
+  data_file->file_format = FileFormatType::kParquet;
+  data_file->partition = kPartition;
+  data_file->file_size_in_bytes = 1024;
+  data_file->record_count = record_count;
+  return data_file;
+}
+
+std::shared_ptr<DataFile> CreateDeleteFile(int64_t record_count) {
+  auto delete_file = std::make_shared<DataFile>();
+  delete_file->content = DataFile::Content::kPositionDeletes;
+  delete_file->file_path = std::format("/path/to/delete-{}.parquet", 
record_count);
+  delete_file->file_format = FileFormatType::kParquet;
+  delete_file->partition = kPartition;
+  delete_file->file_size_in_bytes = 10;
+  delete_file->record_count = record_count;
+  return delete_file;
+}
+
+static std::string CreateManifestPath() {
+  return std::format("manifest-{}.avro",
+                     
std::chrono::system_clock::now().time_since_epoch().count());
+}
+
+}  // namespace
+
+class RollingManifestWriterTest : public ::testing::TestWithParam<int32_t> {
+ protected:
+  void SetUp() override {
+    avro::RegisterAll();
+    schema_ = std::make_shared<Schema>(std::vector<SchemaField>{
+        SchemaField::MakeRequired(1, "id", int64()),
+        SchemaField::MakeRequired(2, "timestamp", timestamp_tz()),
+        SchemaField::MakeRequired(3, "category", string()),
+        SchemaField::MakeRequired(4, "data", string()),
+        SchemaField::MakeRequired(5, "double", float64())});
+    spec_ = PartitionSpec::Make(
+                0, {PartitionField(3, 1000, "category", Transform::Identity()),
+                    PartitionField(2, 1001, "timestamp_hour", 
Transform::Hour()),
+                    PartitionField(1, 1002, "id_bucket", 
Transform::Bucket(16))})
+                .value();
+
+    file_io_ = iceberg::arrow::MakeMockFileIO();
+  }
+
+  RollingManifestWriter::ManifestWriterFactory NewRollingWriteManifestFactory(
+      int32_t format_version) {
+    return [this, format_version]() -> Result<std::unique_ptr<ManifestWriter>> 
{
+      const std::string manifest_path = CreateManifestPath();
+      Result<std::unique_ptr<ManifestWriter>> writer_result =
+          NotSupported("Format version: {}", format_version);
+
+      if (format_version == 1) {
+        writer_result = ManifestWriter::MakeV1Writer(kSnapshotId, 
manifest_path, file_io_,
+                                                     spec_, schema_);
+      } else if (format_version == 2) {
+        writer_result = ManifestWriter::MakeV2Writer(
+            kSnapshotId, manifest_path, file_io_, spec_, schema_, 
ManifestContent::kData);
+      } else if (format_version == 3) {
+        writer_result = ManifestWriter::MakeV3Writer(kSnapshotId, kFirstRowId,
+                                                     manifest_path, file_io_, 
spec_,
+                                                     schema_, 
ManifestContent::kData);
+      }
+
+      return writer_result;
+    };
+  }
+
+  RollingManifestWriter::ManifestWriterFactory 
NewRollingWriteDeleteManifestFactory(
+      int32_t format_version) {
+    return [this, format_version]() -> Result<std::unique_ptr<ManifestWriter>> 
{
+      const std::string manifest_path = CreateManifestPath();
+      Result<std::unique_ptr<ManifestWriter>> writer_result =
+          NotSupported("Format version: {}", format_version);
+
+      if (format_version == 2) {
+        writer_result =
+            ManifestWriter::MakeV2Writer(kSnapshotId, manifest_path, file_io_, 
spec_,
+                                         schema_, ManifestContent::kDeletes);
+      } else if (format_version == 3) {
+        writer_result = ManifestWriter::MakeV3Writer(kSnapshotId, kFirstRowId,
+                                                     manifest_path, file_io_, 
spec_,
+                                                     schema_, 
ManifestContent::kDeletes);
+      }
+
+      return writer_result;
+    };
+  }
+
+  void CheckManifests(const std::vector<ManifestFile>& manifests,
+                      const std::vector<int32_t>& added_file_counts,
+                      const std::vector<int32_t>& existing_file_counts,
+                      const std::vector<int32_t>& deleted_file_counts,
+                      const std::vector<int64_t>& added_row_counts,
+                      const std::vector<int64_t>& existing_row_counts,
+                      const std::vector<int64_t>& deleted_row_counts) {
+    ASSERT_EQ(manifests.size(), added_file_counts.size());
+    for (size_t i = 0; i < manifests.size(); i++) {
+      const ManifestFile& manifest = manifests[i];
+      EXPECT_TRUE(manifest.has_added_files());
+      EXPECT_EQ(manifest.added_files_count.value_or(0), added_file_counts[i]);
+      EXPECT_EQ(manifest.added_rows_count.value_or(0), added_row_counts[i]);
+
+      EXPECT_TRUE(manifest.has_existing_files());
+      EXPECT_EQ(manifest.existing_files_count.value_or(0), 
existing_file_counts[i]);
+      EXPECT_EQ(manifest.existing_rows_count.value_or(0), 
existing_row_counts[i]);
+
+      EXPECT_TRUE(manifest.has_deleted_files());
+      EXPECT_EQ(manifest.deleted_files_count.value_or(0), 
deleted_file_counts[i]);
+      EXPECT_EQ(manifest.deleted_rows_count.value_or(0), 
deleted_row_counts[i]);
+    }
+  }
+
+  std::shared_ptr<Schema> schema_;
+  std::shared_ptr<PartitionSpec> spec_;
+  std::shared_ptr<FileIO> file_io_;
+};
+
+TEST_P(RollingManifestWriterTest, TestRollingManifestWriterNoRecords) {
+  int32_t format_version = GetParam();
+  RollingManifestWriter writer(NewRollingWriteManifestFactory(format_version),
+                               kSmallFileSize);
+
+  EXPECT_THAT(writer.Close(), IsOk());
+  ICEBERG_UNWRAP_OR_FAIL(auto manifest_files, writer.ToManifestFiles());
+  EXPECT_TRUE(manifest_files.empty());
+
+  // Test that calling close again doesn't change the result
+  EXPECT_THAT(writer.Close(), IsOk());
+  ICEBERG_UNWRAP_OR_FAIL(manifest_files, writer.ToManifestFiles());
+  EXPECT_TRUE(manifest_files.empty());
+}
+
+TEST_P(RollingManifestWriterTest, TestRollingDeleteManifestWriterNoRecords) {
+  int32_t format_version = GetParam();
+  if (format_version < 2) {
+    GTEST_SKIP() << "Delete manifests only supported in V2+";
+  }
+  RollingManifestWriter 
writer(NewRollingWriteDeleteManifestFactory(format_version),
+                               kSmallFileSize);
+
+  EXPECT_THAT(writer.Close(), IsOk());
+  ICEBERG_UNWRAP_OR_FAIL(auto manifest_files, writer.ToManifestFiles());
+  EXPECT_TRUE(manifest_files.empty());
+
+  // Test that calling close again doesn't change the result
+  EXPECT_THAT(writer.Close(), IsOk());
+  ICEBERG_UNWRAP_OR_FAIL(manifest_files, writer.ToManifestFiles());
+  EXPECT_TRUE(manifest_files.empty());
+}
+
+TEST_P(RollingManifestWriterTest, TestRollingManifestWriterSplitFiles) {
+  int32_t format_version = GetParam();
+  RollingManifestWriter writer(NewRollingWriteManifestFactory(format_version),
+                               kSmallFileSize);
+
+  std::vector<int32_t> added_file_counts(3, 0);
+  std::vector<int32_t> existing_file_counts(3, 0);
+  std::vector<int32_t> deleted_file_counts(3, 0);
+  std::vector<int64_t> added_row_counts(3, 0);
+  std::vector<int64_t> existing_row_counts(3, 0);
+  std::vector<int64_t> deleted_row_counts(3, 0);
+
+  // Write 750 entries (3 * 250) to trigger 3 file splits
+  for (int32_t i = 0; i < kFileSizeCheckRowsDivisor * 3; i++) {
+    int32_t type = i % 3;
+    int32_t file_index = i / kFileSizeCheckRowsDivisor;
+    auto data_file = CreateDataFile(i);
+
+    if (type == 0) {
+      EXPECT_THAT(writer.WriteAddedEntry(data_file), IsOk());
+      added_file_counts[file_index] += 1;
+      added_row_counts[file_index] += i;
+    } else if (type == 1) {
+      EXPECT_THAT(writer.WriteExistingEntry(data_file, 1, 1, std::nullopt), 
IsOk());
+      existing_file_counts[file_index] += 1;
+      existing_row_counts[file_index] += i;
+    } else {
+      EXPECT_THAT(writer.WriteDeletedEntry(data_file, 1, std::nullopt), 
IsOk());
+      deleted_file_counts[file_index] += 1;
+      deleted_row_counts[file_index] += i;
+    }
+  }
+
+  EXPECT_THAT(writer.Close(), IsOk());
+  ICEBERG_UNWRAP_OR_FAIL(auto manifest_files, writer.ToManifestFiles());
+  EXPECT_EQ(manifest_files.size(), 3);
+
+  CheckManifests(manifest_files, added_file_counts, existing_file_counts,
+                 deleted_file_counts, added_row_counts, existing_row_counts,
+                 deleted_row_counts);
+
+  // Test that calling close again doesn't change the result
+  EXPECT_THAT(writer.Close(), IsOk());
+  ICEBERG_UNWRAP_OR_FAIL(manifest_files, writer.ToManifestFiles());
+  EXPECT_EQ(manifest_files.size(), 3);
+
+  CheckManifests(manifest_files, added_file_counts, existing_file_counts,
+                 deleted_file_counts, added_row_counts, existing_row_counts,
+                 deleted_row_counts);
+}
+
+TEST_P(RollingManifestWriterTest, TestRollingDeleteManifestWriterSplitFiles) {
+  int32_t format_version = GetParam();
+  if (format_version < 2) {
+    GTEST_SKIP() << "Delete manifests only supported in V2+";
+  }
+  RollingManifestWriter 
writer(NewRollingWriteDeleteManifestFactory(format_version),
+                               kSmallFileSize);
+
+  std::vector<int32_t> added_file_counts(3, 0);
+  std::vector<int32_t> existing_file_counts(3, 0);
+  std::vector<int32_t> deleted_file_counts(3, 0);
+  std::vector<int64_t> added_row_counts(3, 0);
+  std::vector<int64_t> existing_row_counts(3, 0);
+  std::vector<int64_t> deleted_row_counts(3, 0);
+
+  // Write 750 entries (3 * 250) to trigger 3 file splits
+  for (int32_t i = 0; i < 3 * kFileSizeCheckRowsDivisor; i++) {
+    int32_t type = i % 3;
+    int32_t file_index = i / kFileSizeCheckRowsDivisor;
+    auto delete_file = CreateDeleteFile(i);
+
+    if (type == 0) {
+      EXPECT_THAT(writer.WriteAddedEntry(delete_file), IsOk());
+      added_file_counts[file_index] += 1;
+      added_row_counts[file_index] += i;
+    } else if (type == 1) {
+      EXPECT_THAT(writer.WriteExistingEntry(delete_file, 1, 1, std::nullopt), 
IsOk());
+      existing_file_counts[file_index] += 1;
+      existing_row_counts[file_index] += i;
+    } else {
+      EXPECT_THAT(writer.WriteDeletedEntry(delete_file, 1, std::nullopt), 
IsOk());
+      deleted_file_counts[file_index] += 1;
+      deleted_row_counts[file_index] += i;
+    }
+  }
+
+  EXPECT_THAT(writer.Close(), IsOk());
+  ICEBERG_UNWRAP_OR_FAIL(auto manifest_files, writer.ToManifestFiles());
+  EXPECT_EQ(manifest_files.size(), 3);
+
+  CheckManifests(manifest_files, added_file_counts, existing_file_counts,
+                 deleted_file_counts, added_row_counts, existing_row_counts,
+                 deleted_row_counts);
+
+  // Test that calling close again doesn't change the result
+  EXPECT_THAT(writer.Close(), IsOk());
+  ICEBERG_UNWRAP_OR_FAIL(manifest_files, writer.ToManifestFiles());
+  EXPECT_EQ(manifest_files.size(), 3);
+
+  CheckManifests(manifest_files, added_file_counts, existing_file_counts,
+                 deleted_file_counts, added_row_counts, existing_row_counts,
+                 deleted_row_counts);
+}
+
+INSTANTIATE_TEST_SUITE_P(TestRollingManifestWriter, RollingManifestWriterTest,
+                         ::testing::Values(1, 2, 3));
+
+}  // namespace iceberg
diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc
index 33864ca9..c8446e8b 100644
--- a/src/iceberg/transaction.cc
+++ b/src/iceberg/transaction.cc
@@ -107,7 +107,7 @@ Status Transaction::Apply(PendingUpdate& update) {
     } break;
     default:
       return NotSupported("Unsupported pending update: {}",
-                          static_cast<int>(update.kind()));
+                          static_cast<int32_t>(update.kind()));
   }
 
   last_update_committed_ = true;
diff --git a/src/iceberg/type.cc b/src/iceberg/type.cc
index 99b3433b..ed10e127 100644
--- a/src/iceberg/type.cc
+++ b/src/iceberg/type.cc
@@ -19,6 +19,7 @@
 
 #include "iceberg/type.h"
 
+#include <cstdint>
 #include <format>
 #include <iterator>
 #include <memory>
@@ -162,7 +163,7 @@ Result<std::optional<NestedType::SchemaFieldConstRef>> 
ListType::GetFieldById(
 }
 
 Result<std::optional<NestedType::SchemaFieldConstRef>> 
ListType::GetFieldByIndex(
-    int index) const {
+    int32_t index) const {
   if (index == 0) {
     return std::cref(element_);
   }

Reply via email to