This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 1afe65cd feat: Implement EqualityDeleteWriter for equality delete
files (#583)
1afe65cd is described below
commit 1afe65cdce995f31795ed4ebf4c54d4b5ef74f6c
Author: Xinli Shang <[email protected]>
AuthorDate: Sat Mar 14 20:38:16 2026 -0700
feat: Implement EqualityDeleteWriter for equality delete files (#583)
Implement the EqualityDeleteWriter following the same PIMPL pattern as
DataWriter. The writer accepts Arrow data matching the equality delete
schema (columns for the equality field values) and produces metadata
with content=kEqualityDeletes, equality_ids set from options, and
sort_order_id propagated from options.
---
src/iceberg/data/data_writer.cc | 14 ++-
src/iceberg/data/equality_delete_writer.cc | 113 +++++++++++++++++++++--
src/iceberg/data/equality_delete_writer.h | 7 ++
src/iceberg/data/position_delete_writer.cc | 3 +
src/iceberg/test/data_writer_test.cc | 139 +++++++++++++++++++++++++++++
src/iceberg/type_fwd.h | 12 +--
6 files changed, 269 insertions(+), 19 deletions(-)
diff --git a/src/iceberg/data/data_writer.cc b/src/iceberg/data/data_writer.cc
index b00465bb..42047128 100644
--- a/src/iceberg/data/data_writer.cc
+++ b/src/iceberg/data/data_writer.cc
@@ -23,6 +23,7 @@
#include "iceberg/file_writer.h"
#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/partition_spec.h"
#include "iceberg/util/macros.h"
namespace iceberg {
@@ -43,18 +44,11 @@ class DataWriter::Impl {
return std::unique_ptr<Impl>(new Impl(std::move(options),
std::move(writer)));
}
- Status Write(ArrowArray* data) {
- ICEBERG_DCHECK(writer_, "Writer not initialized");
- return writer_->Write(data);
- }
+ Status Write(ArrowArray* data) { return writer_->Write(data); }
- Result<int64_t> Length() const {
- ICEBERG_DCHECK(writer_, "Writer not initialized");
- return writer_->length();
- }
+ Result<int64_t> Length() const { return writer_->length(); }
Status Close() {
- ICEBERG_DCHECK(writer_, "Writer not initialized");
if (closed_) {
// Idempotent: no-op if already closed
return {};
@@ -100,6 +94,8 @@ class DataWriter::Impl {
.upper_bounds = std::move(upper_bounds_map),
.split_offsets = std::move(split_offsets),
.sort_order_id = options_.sort_order_id,
+ .partition_spec_id =
+ options_.spec ? std::make_optional(options_.spec->spec_id()) :
std::nullopt,
});
FileWriter::WriteResult result;
diff --git a/src/iceberg/data/equality_delete_writer.cc
b/src/iceberg/data/equality_delete_writer.cc
index 3edb942c..6604c03e 100644
--- a/src/iceberg/data/equality_delete_writer.cc
+++ b/src/iceberg/data/equality_delete_writer.cc
@@ -19,24 +19,127 @@
#include "iceberg/data/equality_delete_writer.h"
+#include <map>
+
+#include "iceberg/file_writer.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/util/macros.h"
+
namespace iceberg {
class EqualityDeleteWriter::Impl {
public:
+ static Result<std::unique_ptr<Impl>> Make(EqualityDeleteWriterOptions
options) {
+ WriterOptions writer_options{
+ .path = options.path,
+ .schema = options.schema,
+ .io = options.io,
+ .properties = WriterProperties::FromMap(options.properties),
+ };
+
+ ICEBERG_ASSIGN_OR_RAISE(auto writer,
+ WriterFactoryRegistry::Open(options.format,
writer_options));
+
+ return std::unique_ptr<Impl>(new Impl(std::move(options),
std::move(writer)));
+ }
+
+ Status Write(ArrowArray* data) { return writer_->Write(data); }
+
+ Result<int64_t> Length() const { return writer_->length(); }
+
+ Status Close() {
+ if (closed_) {
+ return {};
+ }
+ ICEBERG_RETURN_UNEXPECTED(writer_->Close());
+ closed_ = true;
+ return {};
+ }
+
+ Result<FileWriter::WriteResult> Metadata() {
+ ICEBERG_CHECK(closed_, "Cannot get metadata before closing the writer");
+
+ ICEBERG_ASSIGN_OR_RAISE(auto metrics, writer_->metrics());
+ ICEBERG_ASSIGN_OR_RAISE(auto length, writer_->length());
+ auto split_offsets = writer_->split_offsets();
+
+ // Serialize literal bounds to binary format
+ std::map<int32_t, std::vector<uint8_t>> lower_bounds_map;
+ for (const auto& [col_id, literal] : metrics.lower_bounds) {
+ ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
+ lower_bounds_map[col_id] = std::move(serialized);
+ }
+ std::map<int32_t, std::vector<uint8_t>> upper_bounds_map;
+ for (const auto& [col_id, literal] : metrics.upper_bounds) {
+ ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
+ upper_bounds_map[col_id] = std::move(serialized);
+ }
+
+ // TODO(anyone): add encryption key metadata for encrypted delete files
+ auto data_file = std::make_shared<DataFile>(DataFile{
+ .content = DataFile::Content::kEqualityDeletes,
+ .file_path = options_.path,
+ .file_format = options_.format,
+ .partition = options_.partition,
+ .record_count = metrics.row_count.value_or(-1),
+ .file_size_in_bytes = length,
+ .column_sizes = {metrics.column_sizes.begin(),
metrics.column_sizes.end()},
+ .value_counts = {metrics.value_counts.begin(),
metrics.value_counts.end()},
+ .null_value_counts = {metrics.null_value_counts.begin(),
+ metrics.null_value_counts.end()},
+ .nan_value_counts = {metrics.nan_value_counts.begin(),
+ metrics.nan_value_counts.end()},
+ .lower_bounds = std::move(lower_bounds_map),
+ .upper_bounds = std::move(upper_bounds_map),
+ .split_offsets = std::move(split_offsets),
+ .equality_ids = options_.equality_field_ids,
+ .sort_order_id = options_.sort_order_id,
+ .partition_spec_id =
+ options_.spec ? std::make_optional(options_.spec->spec_id()) :
std::nullopt,
+ });
+
+ FileWriter::WriteResult result;
+ result.data_files.push_back(std::move(data_file));
+ return result;
+ }
+
+ std::span<const int32_t> equality_field_ids() const {
+ return options_.equality_field_ids;
+ }
+
+ private:
+ Impl(EqualityDeleteWriterOptions options, std::unique_ptr<Writer> writer)
+ : options_(std::move(options)), writer_(std::move(writer)) {}
+
+ EqualityDeleteWriterOptions options_;
+ std::unique_ptr<Writer> writer_;
+ bool closed_ = false;
};
+EqualityDeleteWriter::EqualityDeleteWriter(std::unique_ptr<Impl> impl)
+ : impl_(std::move(impl)) {}
+
EqualityDeleteWriter::~EqualityDeleteWriter() = default;
-Status EqualityDeleteWriter::Write(ArrowArray* data) { return
NotImplemented(""); }
+Result<std::unique_ptr<EqualityDeleteWriter>> EqualityDeleteWriter::Make(
+ const EqualityDeleteWriterOptions& options) {
+ ICEBERG_ASSIGN_OR_RAISE(auto impl, Impl::Make(options));
+ return std::unique_ptr<EqualityDeleteWriter>(new
EqualityDeleteWriter(std::move(impl)));
+}
+
+Status EqualityDeleteWriter::Write(ArrowArray* data) { return
impl_->Write(data); }
-Result<int64_t> EqualityDeleteWriter::Length() const { return
NotImplemented(""); }
+Result<int64_t> EqualityDeleteWriter::Length() const { return impl_->Length();
}
-Status EqualityDeleteWriter::Close() { return NotImplemented(""); }
+Status EqualityDeleteWriter::Close() { return impl_->Close(); }
Result<FileWriter::WriteResult> EqualityDeleteWriter::Metadata() {
- return NotImplemented("");
+ return impl_->Metadata();
}
-std::span<const int32_t> EqualityDeleteWriter::equality_field_ids() const {
return {}; }
+std::span<const int32_t> EqualityDeleteWriter::equality_field_ids() const {
+ return impl_->equality_field_ids();
+}
} // namespace iceberg
diff --git a/src/iceberg/data/equality_delete_writer.h
b/src/iceberg/data/equality_delete_writer.h
index 9de4918d..d1728a48 100644
--- a/src/iceberg/data/equality_delete_writer.h
+++ b/src/iceberg/data/equality_delete_writer.h
@@ -50,6 +50,7 @@ struct ICEBERG_EXPORT EqualityDeleteWriterOptions {
std::vector<int32_t> equality_field_ids;
std::optional<int32_t> sort_order_id;
std::unordered_map<std::string, std::string> properties;
+ // TODO(anyone): add key_metadata for encryption
};
/// \brief Writer for Iceberg equality delete files.
@@ -57,6 +58,10 @@ class ICEBERG_EXPORT EqualityDeleteWriter : public
FileWriter {
public:
~EqualityDeleteWriter() override;
+ /// \brief Create a new EqualityDeleteWriter instance.
+ static Result<std::unique_ptr<EqualityDeleteWriter>> Make(
+ const EqualityDeleteWriterOptions& options);
+
Status Write(ArrowArray* data) override;
Result<int64_t> Length() const override;
Status Close() override;
@@ -67,6 +72,8 @@ class ICEBERG_EXPORT EqualityDeleteWriter : public FileWriter
{
private:
class Impl;
std::unique_ptr<Impl> impl_;
+
+ explicit EqualityDeleteWriter(std::unique_ptr<Impl> impl);
};
} // namespace iceberg
diff --git a/src/iceberg/data/position_delete_writer.cc
b/src/iceberg/data/position_delete_writer.cc
index 9fae9c2b..3238dc50 100644
--- a/src/iceberg/data/position_delete_writer.cc
+++ b/src/iceberg/data/position_delete_writer.cc
@@ -30,6 +30,7 @@
#include "iceberg/file_writer.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/metadata_columns.h"
+#include "iceberg/partition_spec.h"
#include "iceberg/schema.h"
#include "iceberg/schema_internal.h"
#include "iceberg/util/macros.h"
@@ -154,6 +155,8 @@ class PositionDeleteWriter::Impl {
.split_offsets = std::move(split_offsets),
.sort_order_id = std::nullopt,
.referenced_data_file = std::move(referenced_data_file),
+ .partition_spec_id =
+ options_.spec ? std::make_optional(options_.spec->spec_id()) :
std::nullopt,
});
FileWriter::WriteResult result;
diff --git a/src/iceberg/test/data_writer_test.cc
b/src/iceberg/test/data_writer_test.cc
index f7778faf..a3a8fc08 100644
--- a/src/iceberg/test/data_writer_test.cc
+++ b/src/iceberg/test/data_writer_test.cc
@@ -27,6 +27,7 @@
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
#include "iceberg/avro/avro_register.h"
+#include "iceberg/data/equality_delete_writer.h"
#include "iceberg/data/position_delete_writer.h"
#include "iceberg/file_format.h"
#include "iceberg/manifest/manifest_entry.h"
@@ -423,4 +424,142 @@ TEST_F(PositionDeleteWriterTest, AutoFlushOnThreshold) {
EXPECT_GT(data_file->file_size_in_bytes, 0);
}
+class EqualityDeleteWriterTest : public DataWriterTest {
+ protected:
+ EqualityDeleteWriterOptions MakeDeleteOptions(
+ std::vector<int32_t> equality_field_ids = {1, 2},
+ std::optional<int32_t> sort_order_id = std::nullopt) {
+ return EqualityDeleteWriterOptions{
+ .path = "test_eq_deletes.parquet",
+ .schema = schema_,
+ .spec = partition_spec_,
+ .partition = PartitionValues{},
+ .format = FileFormatType::kParquet,
+ .io = file_io_,
+ .equality_field_ids = std::move(equality_field_ids),
+ .sort_order_id = sort_order_id,
+ .properties = {{"write.parquet.compression-codec", "uncompressed"}},
+ };
+ }
+
+ void WriteTestDataToEqualityWriter(EqualityDeleteWriter* writer) {
+ auto test_data = CreateTestData();
+ ArrowArray arrow_array;
+ ASSERT_TRUE(::arrow::ExportArray(*test_data, &arrow_array).ok());
+ ASSERT_THAT(writer->Write(&arrow_array), IsOk());
+ }
+};
+
+TEST_F(EqualityDeleteWriterTest, WriteAndClose) {
+ auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions());
+ ASSERT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ WriteTestDataToEqualityWriter(writer.get());
+
+ auto length_result = writer->Length();
+ ASSERT_THAT(length_result, IsOk());
+ EXPECT_GT(length_result.value(), 0);
+
+ ASSERT_THAT(writer->Close(), IsOk());
+}
+
+TEST_F(EqualityDeleteWriterTest, MetadataAfterClose) {
+ auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions());
+ ASSERT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ WriteTestDataToEqualityWriter(writer.get());
+ ASSERT_THAT(writer->Close(), IsOk());
+
+ auto metadata_result = writer->Metadata();
+ ASSERT_THAT(metadata_result, IsOk());
+
+ const auto& write_result = metadata_result.value();
+ ASSERT_EQ(write_result.data_files.size(), 1);
+
+ const auto& data_file = write_result.data_files[0];
+ EXPECT_EQ(data_file->content, DataFile::Content::kEqualityDeletes);
+ EXPECT_EQ(data_file->file_path, "test_eq_deletes.parquet");
+ EXPECT_EQ(data_file->file_format, FileFormatType::kParquet);
+ EXPECT_GT(data_file->file_size_in_bytes, 0);
+
+ // Partition spec id must be set
+ ASSERT_TRUE(data_file->partition_spec_id.has_value());
+ EXPECT_EQ(data_file->partition_spec_id.value(),
PartitionSpec::kInitialSpecId);
+
+ // Equality field ids must be set
+ ASSERT_EQ(data_file->equality_ids.size(), 2);
+ EXPECT_EQ(data_file->equality_ids[0], 1);
+ EXPECT_EQ(data_file->equality_ids[1], 2);
+}
+
+TEST_F(EqualityDeleteWriterTest, MetadataBeforeCloseReturnsError) {
+ auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions());
+ ASSERT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ auto metadata_result = writer->Metadata();
+ ASSERT_THAT(metadata_result, IsError(ErrorKind::kValidationFailed));
+ EXPECT_THAT(metadata_result,
+ HasErrorMessage("Cannot get metadata before closing the
writer"));
+}
+
+TEST_F(EqualityDeleteWriterTest, CloseIsIdempotent) {
+ auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions());
+ ASSERT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ WriteTestDataToEqualityWriter(writer.get());
+
+ ASSERT_THAT(writer->Close(), IsOk());
+ ASSERT_THAT(writer->Close(), IsOk());
+ ASSERT_THAT(writer->Close(), IsOk());
+}
+
+TEST_F(EqualityDeleteWriterTest, SortOrderIdInMetadata) {
+ const int32_t sort_order_id = 7;
+ auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions({1},
sort_order_id));
+ ASSERT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ WriteTestDataToEqualityWriter(writer.get());
+ ASSERT_THAT(writer->Close(), IsOk());
+
+ auto metadata_result = writer->Metadata();
+ ASSERT_THAT(metadata_result, IsOk());
+ const auto& data_file = metadata_result.value().data_files[0];
+ ASSERT_TRUE(data_file->sort_order_id.has_value());
+ EXPECT_EQ(data_file->sort_order_id.value(), sort_order_id);
+}
+
+TEST_F(EqualityDeleteWriterTest, EqualityFieldIdsAccessor) {
+ std::vector<int32_t> field_ids = {1, 2, 3};
+ auto writer_result =
EqualityDeleteWriter::Make(MakeDeleteOptions(field_ids));
+ ASSERT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ auto ids = writer->equality_field_ids();
+ ASSERT_EQ(ids.size(), 3);
+ EXPECT_EQ(ids[0], 1);
+ EXPECT_EQ(ids[1], 2);
+ EXPECT_EQ(ids[2], 3);
+}
+
+TEST_F(EqualityDeleteWriterTest, WriteMultipleBatches) {
+ auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions());
+ ASSERT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ WriteTestDataToEqualityWriter(writer.get());
+ WriteTestDataToEqualityWriter(writer.get());
+ ASSERT_THAT(writer->Close(), IsOk());
+
+ auto metadata_result = writer->Metadata();
+ ASSERT_THAT(metadata_result, IsOk());
+ const auto& data_file = metadata_result.value().data_files[0];
+ EXPECT_EQ(data_file->content, DataFile::Content::kEqualityDeletes);
+ EXPECT_GT(data_file->file_size_in_bytes, 0);
+}
+
} // namespace iceberg
diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h
index 8d6824ee..cad3e969 100644
--- a/src/iceberg/type_fwd.h
+++ b/src/iceberg/type_fwd.h
@@ -142,20 +142,22 @@ class ManifestEvaluator;
class ResidualEvaluator;
class StrictMetricsEvaluator;
-/// \brief Scan.
+/// \brief Scan task.
class ChangelogScanTask;
-class DataTableScan;
class FileScanTask;
+class ScanTask;
+
+/// \brief Table scan
+class DataTableScan;
template <typename ScanTaskType>
class IncrementalScan;
class IncrementalAppendScan;
class IncrementalChangelogScan;
-class ScanTask;
class TableScan;
+
+/// \brief Scan builder.
template <typename ScanType>
class TableScanBuilder;
-
-// Type aliases for incremental scan builders
using DataTableScanBuilder = TableScanBuilder<DataTableScan>;
using IncrementalAppendScanBuilder = TableScanBuilder<IncrementalAppendScan>;
using IncrementalChangelogScanBuilder =
TableScanBuilder<IncrementalChangelogScan>;