shangxinli commented on code in PR #582:
URL: https://github.com/apache/iceberg-cpp/pull/582#discussion_r2934289182


##########
src/iceberg/data/position_delete_writer.cc:
##########
@@ -19,26 +19,201 @@
 
 #include "iceberg/data/position_delete_writer.h"
 
+#include <map>
+#include <set>
+#include <vector>
+
+#include <nanoarrow/nanoarrow.h>
+
+#include "iceberg/arrow/nanoarrow_status_internal.h"
+#include "iceberg/file_writer.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/metadata_columns.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/util/macros.h"
+
 namespace iceberg {
 
 class PositionDeleteWriter::Impl {
  public:
+  static Result<std::unique_ptr<Impl>> Make(PositionDeleteWriterOptions 
options) {
+    // Build the position delete schema with file_path and pos columns
+    std::vector<SchemaField> fields;
+    fields.push_back(MetadataColumns::kDeleteFilePath);
+    fields.push_back(MetadataColumns::kDeleteFilePos);
+
+    auto delete_schema = std::make_shared<Schema>(std::move(fields));
+
+    WriterOptions writer_options{
+        .path = options.path,
+        .schema = delete_schema,
+        .io = options.io,
+        .properties = WriterProperties::FromMap(options.properties),
+    };
+
+    ICEBERG_ASSIGN_OR_RAISE(auto writer,
+                            WriterFactoryRegistry::Open(options.format, 
writer_options));
+
+    return std::unique_ptr<Impl>(
+        new Impl(std::move(options), std::move(delete_schema), 
std::move(writer)));
+  }
+
+  Status Write(ArrowArray* data) {
+    ICEBERG_DCHECK(writer_, "Writer not initialized");
+    return writer_->Write(data);
+  }
+
+  Status WriteDelete(std::string_view file_path, int64_t pos) {
+    ICEBERG_DCHECK(writer_, "Writer not initialized");
+    buffered_paths_.emplace_back(file_path);
+    buffered_positions_.push_back(pos);
+    referenced_paths_.emplace(file_path);
+
+    if (static_cast<int64_t>(buffered_paths_.size()) >= 
options_.flush_threshold) {
+      return FlushBuffer();
+    }
+    return {};
+  }
+
+  Result<int64_t> Length() const {
+    ICEBERG_DCHECK(writer_, "Writer not initialized");
+    return writer_->length();
+  }
+
+  Status Close() {
+    ICEBERG_DCHECK(writer_, "Writer not initialized");
+    if (closed_) {
+      return {};
+    }
+    if (!buffered_paths_.empty()) {
+      ICEBERG_RETURN_UNEXPECTED(FlushBuffer());
+    }
+    ICEBERG_RETURN_UNEXPECTED(writer_->Close());
+    closed_ = true;
+    return {};
+  }
+
+  Result<FileWriter::WriteResult> Metadata() {
+    ICEBERG_CHECK(closed_, "Cannot get metadata before closing the writer");
+
+    ICEBERG_ASSIGN_OR_RAISE(auto metrics, writer_->metrics());
+    ICEBERG_ASSIGN_OR_RAISE(auto length, writer_->length());
+    auto split_offsets = writer_->split_offsets();
+
+    // Serialize literal bounds to binary format
+    std::map<int32_t, std::vector<uint8_t>> lower_bounds_map;
+    for (const auto& [col_id, literal] : metrics.lower_bounds) {
+      ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
+      lower_bounds_map[col_id] = std::move(serialized);
+    }
+    std::map<int32_t, std::vector<uint8_t>> upper_bounds_map;
+    for (const auto& [col_id, literal] : metrics.upper_bounds) {
+      ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
+      upper_bounds_map[col_id] = std::move(serialized);
+    }
+
+    // Set referenced_data_file if all deletes reference the same data file
+    std::optional<std::string> referenced_data_file;
+    if (referenced_paths_.size() == 1) {
+      referenced_data_file = *referenced_paths_.begin();
+    }
+
+    auto data_file = std::make_shared<DataFile>(DataFile{
+        .content = DataFile::Content::kPositionDeletes,
+        .file_path = options_.path,
+        .file_format = options_.format,
+        .partition = options_.partition,
+        .record_count = metrics.row_count.value_or(-1),
+        .file_size_in_bytes = length,
+        .column_sizes = {metrics.column_sizes.begin(), 
metrics.column_sizes.end()},
+        .value_counts = {metrics.value_counts.begin(), 
metrics.value_counts.end()},
+        .null_value_counts = {metrics.null_value_counts.begin(),
+                              metrics.null_value_counts.end()},
+        .nan_value_counts = {metrics.nan_value_counts.begin(),
+                             metrics.nan_value_counts.end()},
+        .lower_bounds = std::move(lower_bounds_map),
+        .upper_bounds = std::move(upper_bounds_map),
+        .split_offsets = std::move(split_offsets),
+        .sort_order_id = std::nullopt,
+        .referenced_data_file = std::move(referenced_data_file),
+    });
+
+    FileWriter::WriteResult result;
+    result.data_files.push_back(std::move(data_file));
+    return result;
+  }
+
+ private:
+  Impl(PositionDeleteWriterOptions options, std::shared_ptr<Schema> 
delete_schema,
+       std::unique_ptr<Writer> writer)
+      : options_(std::move(options)),
+        delete_schema_(std::move(delete_schema)),
+        writer_(std::move(writer)) {}
+
+  Status FlushBuffer() {
+    ArrowSchema arrow_schema;
+    ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*delete_schema_, &arrow_schema));
+
+    ArrowArray array;
+    ArrowError error;
+    ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
+        ArrowArrayInitFromSchema(&array, &arrow_schema, &error), error);

Review Comment:
   Done. Added `ArrowSchemaGuard` and `ArrowArrayGuard` in `FlushBuffer` to 
ensure proper cleanup on early returns. Also fixed the guard destructors to 
check `release \!= nullptr` before calling 
`ArrowArrayRelease`/`ArrowSchemaRelease`, since `Write()` transfers ownership 
of the array via `ImportRecordBatch` (which sets `release = nullptr`).



##########
src/iceberg/data/position_delete_writer.cc:
##########
@@ -19,26 +19,201 @@
 
 #include "iceberg/data/position_delete_writer.h"
 
+#include <map>
+#include <set>
+#include <vector>
+
+#include <nanoarrow/nanoarrow.h>
+
+#include "iceberg/arrow/nanoarrow_status_internal.h"
+#include "iceberg/file_writer.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/metadata_columns.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/util/macros.h"
+
 namespace iceberg {
 
 class PositionDeleteWriter::Impl {
  public:
+  static Result<std::unique_ptr<Impl>> Make(PositionDeleteWriterOptions 
options) {
+    // Build the position delete schema with file_path and pos columns
+    std::vector<SchemaField> fields;
+    fields.push_back(MetadataColumns::kDeleteFilePath);
+    fields.push_back(MetadataColumns::kDeleteFilePos);
+
+    auto delete_schema = std::make_shared<Schema>(std::move(fields));
+
+    WriterOptions writer_options{
+        .path = options.path,
+        .schema = delete_schema,
+        .io = options.io,
+        .properties = WriterProperties::FromMap(options.properties),
+    };
+
+    ICEBERG_ASSIGN_OR_RAISE(auto writer,
+                            WriterFactoryRegistry::Open(options.format, 
writer_options));
+
+    return std::unique_ptr<Impl>(
+        new Impl(std::move(options), std::move(delete_schema), 
std::move(writer)));
+  }
+
+  Status Write(ArrowArray* data) {
+    ICEBERG_DCHECK(writer_, "Writer not initialized");
+    return writer_->Write(data);
+  }
+
+  Status WriteDelete(std::string_view file_path, int64_t pos) {
+    ICEBERG_DCHECK(writer_, "Writer not initialized");
+    buffered_paths_.emplace_back(file_path);
+    buffered_positions_.push_back(pos);
+    referenced_paths_.emplace(file_path);
+
+    if (static_cast<int64_t>(buffered_paths_.size()) >= 
options_.flush_threshold) {
+      return FlushBuffer();
+    }
+    return {};
+  }
+
+  Result<int64_t> Length() const {
+    ICEBERG_DCHECK(writer_, "Writer not initialized");
+    return writer_->length();
+  }
+
+  Status Close() {
+    ICEBERG_DCHECK(writer_, "Writer not initialized");
+    if (closed_) {
+      return {};
+    }
+    if (!buffered_paths_.empty()) {
+      ICEBERG_RETURN_UNEXPECTED(FlushBuffer());
+    }
+    ICEBERG_RETURN_UNEXPECTED(writer_->Close());
+    closed_ = true;
+    return {};
+  }
+
+  Result<FileWriter::WriteResult> Metadata() {
+    ICEBERG_CHECK(closed_, "Cannot get metadata before closing the writer");
+
+    ICEBERG_ASSIGN_OR_RAISE(auto metrics, writer_->metrics());
+    ICEBERG_ASSIGN_OR_RAISE(auto length, writer_->length());
+    auto split_offsets = writer_->split_offsets();
+
+    // Serialize literal bounds to binary format
+    std::map<int32_t, std::vector<uint8_t>> lower_bounds_map;

Review Comment:
   Done. Added metrics filtering in `Metadata()` to match Java's behavior: 
always erase `value_counts`, `null_value_counts`, and `nan_value_counts` for 
`kDeleteFilePathColumnId` and `kDeleteFilePosColumnId`. When 
`referenced_paths_.size() > 1`, also erase `lower_bounds` and `upper_bounds` 
for those columns. This mirrors Java's `MetricsUtil.copyWithoutFieldCounts` / 
`copyWithoutFieldCountsAndBounds` logic.



##########
src/iceberg/data/position_delete_writer.cc:
##########
@@ -19,26 +19,201 @@
 
 #include "iceberg/data/position_delete_writer.h"
 
+#include <map>
+#include <set>
+#include <vector>
+
+#include <nanoarrow/nanoarrow.h>
+
+#include "iceberg/arrow/nanoarrow_status_internal.h"
+#include "iceberg/file_writer.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/metadata_columns.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/util/macros.h"
+
 namespace iceberg {
 
 class PositionDeleteWriter::Impl {
  public:
+  static Result<std::unique_ptr<Impl>> Make(PositionDeleteWriterOptions 
options) {
+    // Build the position delete schema with file_path and pos columns
+    std::vector<SchemaField> fields;
+    fields.push_back(MetadataColumns::kDeleteFilePath);
+    fields.push_back(MetadataColumns::kDeleteFilePos);
+
+    auto delete_schema = std::make_shared<Schema>(std::move(fields));
+
+    WriterOptions writer_options{
+        .path = options.path,
+        .schema = delete_schema,
+        .io = options.io,
+        .properties = WriterProperties::FromMap(options.properties),
+    };
+
+    ICEBERG_ASSIGN_OR_RAISE(auto writer,
+                            WriterFactoryRegistry::Open(options.format, 
writer_options));
+
+    return std::unique_ptr<Impl>(
+        new Impl(std::move(options), std::move(delete_schema), 
std::move(writer)));
+  }
+
+  Status Write(ArrowArray* data) {

Review Comment:
   Added a TODO comment. Extracting paths from the raw ArrowArray would require 
reading the string column from the C Data Interface, which adds complexity. 
Leaving this for a follow-up since the primary API is `WriteDelete()` which 
correctly tracks paths.



##########
src/iceberg/data/position_delete_writer.cc:
##########
@@ -19,26 +19,201 @@
 
 #include "iceberg/data/position_delete_writer.h"
 
+#include <map>
+#include <set>
+#include <vector>
+
+#include <nanoarrow/nanoarrow.h>
+
+#include "iceberg/arrow/nanoarrow_status_internal.h"
+#include "iceberg/file_writer.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/metadata_columns.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/util/macros.h"
+
 namespace iceberg {
 
 class PositionDeleteWriter::Impl {
  public:
+  static Result<std::unique_ptr<Impl>> Make(PositionDeleteWriterOptions 
options) {
+    // Build the position delete schema with file_path and pos columns

Review Comment:
   Added a TODO comment. Note that Java has also deprecated the row field in 
`PositionDelete` as of v1.11.0, so this is low priority. Will add support in a 
follow-up if needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to