This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 18a85e28 feat: add FileWriter base interface for data file writers
(#446)
18a85e28 is described below
commit 18a85e28dea05892d6b606f9013d93e49d2d5563
Author: Xinli Shang <[email protected]>
AuthorDate: Tue Dec 30 00:08:54 2025 -0800
feat: add FileWriter base interface for data file writers (#446)
Add iceberg/data subdirectory with FileWriter base interface that
defines common operations for writing Iceberg data files, including data
files, equality delete files, and position delete files.
---------
Co-authored-by: Gang Wu <[email protected]>
---
src/iceberg/CMakeLists.txt | 2 +
src/iceberg/data/CMakeLists.txt | 18 +++
src/iceberg/data/writer.cc | 26 +++++
src/iceberg/data/writer.h | 83 +++++++++++++
src/iceberg/test/CMakeLists.txt | 2 +
src/iceberg/test/data_writer_test.cc | 218 +++++++++++++++++++++++++++++++++++
6 files changed, 349 insertions(+)
diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index 36c3a483..ca025345 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -20,6 +20,7 @@ set(ICEBERG_INCLUDES
"$<BUILD_INTERFACE:${PROJECT_BINARY_DIR}/src>"
set(ICEBERG_SOURCES
arrow_c_data_guard_internal.cc
catalog/memory/in_memory_catalog.cc
+ data/writer.cc
delete_file_index.cc
expression/aggregate.cc
expression/binder.cc
@@ -147,6 +148,7 @@ add_iceberg_lib(iceberg
iceberg_install_all_headers(iceberg)
add_subdirectory(catalog)
+add_subdirectory(data)
add_subdirectory(expression)
add_subdirectory(manifest)
add_subdirectory(row)
diff --git a/src/iceberg/data/CMakeLists.txt b/src/iceberg/data/CMakeLists.txt
new file mode 100644
index 00000000..e50b8b98
--- /dev/null
+++ b/src/iceberg/data/CMakeLists.txt
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+iceberg_install_all_headers(iceberg/data)
diff --git a/src/iceberg/data/writer.cc b/src/iceberg/data/writer.cc
new file mode 100644
index 00000000..65b17247
--- /dev/null
+++ b/src/iceberg/data/writer.cc
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/data/writer.h"
+
+namespace iceberg {
+
+FileWriter::~FileWriter() = default;
+
+} // namespace iceberg
diff --git a/src/iceberg/data/writer.h b/src/iceberg/data/writer.h
new file mode 100644
index 00000000..6c840091
--- /dev/null
+++ b/src/iceberg/data/writer.h
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/data/writer.h
+/// Base interface for Iceberg data file writers.
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "iceberg/arrow_c_data.h"
+#include "iceberg/iceberg_export.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+
+namespace iceberg {
+
+/// \brief Base interface for data file writers.
+///
+/// This interface defines the common operations for writing Iceberg data
files,
+/// including data files, equality delete files, and position delete files.
+///
+/// Typical usage:
+/// 1. Create a writer instance (via concrete implementation)
+/// 2. Call Write() one or more times to write data
+/// 3. Call Close() to finalize the file
+/// 4. Call Metadata() to get file metadata (only valid after Close())
+class ICEBERG_EXPORT FileWriter {
+ public:
+ virtual ~FileWriter();
+
+ /// \brief Write a batch of records.
+ ///
+ /// \param data Arrow array containing the records to write.
+ /// \return Status indicating success or failure.
+ virtual Status Write(ArrowArray* data) = 0;
+
+ /// \brief Get the current number of bytes written.
+ ///
+ /// \return Result containing the number of bytes written or an error.
+ virtual Result<int64_t> Length() const = 0;
+
+ /// \brief Close the writer and finalize the file.
+ ///
+ /// \return Status indicating success or failure.
+ virtual Status Close() = 0;
+
+ /// \brief File metadata for all files produced by the writer.
+ struct ICEBERG_EXPORT WriteResult {
+ /// Usually a writer produces a single data or delete file.
+ /// Position delete writer may produce multiple file-scoped delete files.
+ /// In the future, multiple files can be produced if file rolling is
supported.
+ std::vector<std::shared_ptr<DataFile>> data_files;
+ };
+
+ /// \brief Get file metadata for all files produced by this writer.
+ ///
+ /// This method should be called after Close() to retrieve the metadata
+ /// for all files written by this writer.
+ ///
+ /// \return Result containing the write result or an error.
+ virtual Result<WriteResult> Metadata() = 0;
+};
+
+} // namespace iceberg
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index 30a473fd..731fe0af 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -166,6 +166,8 @@ if(ICEBERG_BUILD_BUNDLE)
update_properties_test.cc
update_sort_order_test.cc)
+ add_iceberg_test(data_writer_test USE_BUNDLE SOURCES data_writer_test.cc)
+
endif()
if(ICEBERG_BUILD_REST)
diff --git a/src/iceberg/test/data_writer_test.cc
b/src/iceberg/test/data_writer_test.cc
new file mode 100644
index 00000000..df7ea9d8
--- /dev/null
+++ b/src/iceberg/test/data_writer_test.cc
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <memory>
+#include <vector>
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/arrow_c_data.h"
+#include "iceberg/data/writer.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/result.h"
+#include "iceberg/test/matchers.h"
+
+namespace iceberg {
+
+// Mock implementation of FileWriter for testing
+class MockFileWriter : public FileWriter {
+ public:
+ MockFileWriter() = default;
+
+ Status Write(ArrowArray* data) override {
+ if (is_closed_) {
+ return Invalid("Writer is closed");
+ }
+ if (data == nullptr) {
+ return Invalid("Null data provided");
+ }
+ write_count_++;
+ // Simulate writing some bytes
+ bytes_written_ += 1024;
+ return {};
+ }
+
+ Result<int64_t> Length() const override { return bytes_written_; }
+
+ Status Close() override {
+ if (is_closed_) {
+ return Invalid("Writer already closed");
+ }
+ is_closed_ = true;
+ return {};
+ }
+
+ Result<WriteResult> Metadata() override {
+ if (!is_closed_) {
+ return Invalid("Writer must be closed before getting metadata");
+ }
+
+ WriteResult result;
+ auto data_file = std::make_shared<DataFile>();
+ data_file->file_path = "/test/data/file.parquet";
+ data_file->file_format = FileFormatType::kParquet;
+ data_file->record_count = write_count_ * 100;
+ data_file->file_size_in_bytes = bytes_written_;
+ result.data_files.push_back(data_file);
+
+ return result;
+ }
+
+ bool is_closed() const { return is_closed_; }
+ int32_t write_count() const { return write_count_; }
+
+ private:
+ int64_t bytes_written_ = 0;
+ bool is_closed_ = false;
+ int32_t write_count_ = 0;
+};
+
+TEST(FileWriterTest, BasicWriteOperation) {
+ MockFileWriter writer;
+
+ // Create a dummy ArrowArray (normally this would contain actual data)
+ ArrowArray dummy_array = {};
+
+ ASSERT_THAT(writer.Write(&dummy_array), IsOk());
+ ASSERT_EQ(writer.write_count(), 1);
+
+ auto length_result = writer.Length();
+ ASSERT_THAT(length_result, IsOk());
+ ASSERT_EQ(*length_result, 1024);
+}
+
+TEST(FileWriterTest, MultipleWrites) {
+ MockFileWriter writer;
+ ArrowArray dummy_array = {};
+
+ // Write multiple times
+ for (int i = 0; i < 5; i++) {
+ ASSERT_THAT(writer.Write(&dummy_array), IsOk());
+ }
+
+ ASSERT_EQ(writer.write_count(), 5);
+
+ auto length_result = writer.Length();
+ ASSERT_THAT(length_result, IsOk());
+ ASSERT_EQ(*length_result, 5120); // 5 * 1024
+}
+
+TEST(FileWriterTest, WriteNullData) {
+ MockFileWriter writer;
+
+ auto status = writer.Write(nullptr);
+ ASSERT_THAT(status, HasErrorMessage("Null data provided"));
+}
+
+TEST(FileWriterTest, CloseWriter) {
+ MockFileWriter writer;
+ ArrowArray dummy_array = {};
+
+ ASSERT_THAT(writer.Write(&dummy_array), IsOk());
+ ASSERT_FALSE(writer.is_closed());
+
+ ASSERT_THAT(writer.Close(), IsOk());
+ ASSERT_TRUE(writer.is_closed());
+}
+
+TEST(FileWriterTest, DoubleClose) {
+ MockFileWriter writer;
+
+ ASSERT_THAT(writer.Close(), IsOk());
+ auto status = writer.Close();
+ ASSERT_THAT(status, HasErrorMessage("Writer already closed"));
+}
+
+TEST(FileWriterTest, WriteAfterClose) {
+ MockFileWriter writer;
+ ArrowArray dummy_array = {};
+
+ ASSERT_THAT(writer.Close(), IsOk());
+
+ auto status = writer.Write(&dummy_array);
+ ASSERT_THAT(status, HasErrorMessage("Writer is closed"));
+}
+
+TEST(FileWriterTest, MetadataBeforeClose) {
+ MockFileWriter writer;
+ ArrowArray dummy_array = {};
+
+ ASSERT_THAT(writer.Write(&dummy_array), IsOk());
+
+ auto metadata_result = writer.Metadata();
+ ASSERT_THAT(metadata_result,
+ HasErrorMessage("Writer must be closed before getting
metadata"));
+}
+
+TEST(FileWriterTest, MetadataAfterClose) {
+ MockFileWriter writer;
+ ArrowArray dummy_array = {};
+
+ // Write some data
+ ASSERT_THAT(writer.Write(&dummy_array), IsOk());
+ ASSERT_THAT(writer.Write(&dummy_array), IsOk());
+ ASSERT_THAT(writer.Write(&dummy_array), IsOk());
+
+ // Close the writer
+ ASSERT_THAT(writer.Close(), IsOk());
+
+ // Get metadata
+ auto metadata_result = writer.Metadata();
+ ASSERT_THAT(metadata_result, IsOk());
+
+ const auto& result = *metadata_result;
+ ASSERT_EQ(result.data_files.size(), 1);
+
+ const auto& data_file = result.data_files[0];
+ ASSERT_EQ(data_file->file_path, "/test/data/file.parquet");
+ ASSERT_EQ(data_file->file_format, FileFormatType::kParquet);
+ ASSERT_EQ(data_file->record_count, 300); // 3 writes * 100 records
+ ASSERT_EQ(data_file->file_size_in_bytes, 3072); // 3 * 1024
+}
+
+TEST(FileWriterTest, WriteResultStructure) {
+ FileWriter::WriteResult result;
+
+ // Test that WriteResult can hold multiple data files
+ auto data_file1 = std::make_shared<DataFile>();
+ data_file1->file_path = "/test/data/file1.parquet";
+ data_file1->record_count = 100;
+
+ auto data_file2 = std::make_shared<DataFile>();
+ data_file2->file_path = "/test/data/file2.parquet";
+ data_file2->record_count = 200;
+
+ result.data_files.push_back(data_file1);
+ result.data_files.push_back(data_file2);
+
+ ASSERT_EQ(result.data_files.size(), 2);
+ ASSERT_EQ(result.data_files[0]->file_path, "/test/data/file1.parquet");
+ ASSERT_EQ(result.data_files[0]->record_count, 100);
+ ASSERT_EQ(result.data_files[1]->file_path, "/test/data/file2.parquet");
+ ASSERT_EQ(result.data_files[1]->record_count, 200);
+}
+
+TEST(FileWriterTest, EmptyWriteResult) {
+ FileWriter::WriteResult result;
+ ASSERT_EQ(result.data_files.size(), 0);
+ ASSERT_TRUE(result.data_files.empty());
+}
+
+} // namespace iceberg