wgtmac commented on code in PR #582:
URL: https://github.com/apache/iceberg-cpp/pull/582#discussion_r2931904994
##########
src/iceberg/data/position_delete_writer.cc:
##########
@@ -19,26 +19,201 @@
#include "iceberg/data/position_delete_writer.h"
+#include <map>
+#include <set>
+#include <vector>
+
+#include <nanoarrow/nanoarrow.h>
+
+#include "iceberg/arrow/nanoarrow_status_internal.h"
+#include "iceberg/file_writer.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/metadata_columns.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/util/macros.h"
+
namespace iceberg {
class PositionDeleteWriter::Impl {
public:
+ static Result<std::unique_ptr<Impl>> Make(PositionDeleteWriterOptions
options) {
+ // Build the position delete schema with file_path and pos columns
+ std::vector<SchemaField> fields;
+ fields.push_back(MetadataColumns::kDeleteFilePath);
+ fields.push_back(MetadataColumns::kDeleteFilePos);
+
+ auto delete_schema = std::make_shared<Schema>(std::move(fields));
+
+ WriterOptions writer_options{
+ .path = options.path,
+ .schema = delete_schema,
+ .io = options.io,
+ .properties = WriterProperties::FromMap(options.properties),
+ };
+
+ ICEBERG_ASSIGN_OR_RAISE(auto writer,
+ WriterFactoryRegistry::Open(options.format,
writer_options));
+
+ return std::unique_ptr<Impl>(
+ new Impl(std::move(options), std::move(delete_schema),
std::move(writer)));
+ }
+
+ Status Write(ArrowArray* data) {
+ ICEBERG_DCHECK(writer_, "Writer not initialized");
+ return writer_->Write(data);
+ }
+
+ Status WriteDelete(std::string_view file_path, int64_t pos) {
+ ICEBERG_DCHECK(writer_, "Writer not initialized");
+ buffered_paths_.emplace_back(file_path);
+ buffered_positions_.push_back(pos);
+ referenced_paths_.emplace(file_path);
+
+ if (static_cast<int64_t>(buffered_paths_.size()) >=
options_.flush_threshold) {
+ return FlushBuffer();
+ }
+ return {};
+ }
+
+ Result<int64_t> Length() const {
+ ICEBERG_DCHECK(writer_, "Writer not initialized");
+ return writer_->length();
+ }
+
+ Status Close() {
+ ICEBERG_DCHECK(writer_, "Writer not initialized");
+ if (closed_) {
+ return {};
+ }
+ if (!buffered_paths_.empty()) {
+ ICEBERG_RETURN_UNEXPECTED(FlushBuffer());
+ }
+ ICEBERG_RETURN_UNEXPECTED(writer_->Close());
+ closed_ = true;
+ return {};
+ }
+
+ Result<FileWriter::WriteResult> Metadata() {
+ ICEBERG_CHECK(closed_, "Cannot get metadata before closing the writer");
+
+ ICEBERG_ASSIGN_OR_RAISE(auto metrics, writer_->metrics());
+ ICEBERG_ASSIGN_OR_RAISE(auto length, writer_->length());
+ auto split_offsets = writer_->split_offsets();
+
+ // Serialize literal bounds to binary format
+ std::map<int32_t, std::vector<uint8_t>> lower_bounds_map;
Review Comment:
**Parity Issue (Metrics Bloat):**
Java's `PositionDeleteWriter` explicitly drops field counts (and bounds if
referencing multiple files) for the `file_path` and `pos` columns to avoid
bloating the manifest. C++ currently copies all metrics blindly.
*Fix:* Add a `// TODO` or implement logic to filter out `lower_bounds`,
`upper_bounds`, and counts for `MetadataColumns::kDeleteFilePathColumnId` and
`MetadataColumns::kDeleteFilePosColumnId`.
##########
src/iceberg/data/position_delete_writer.cc:
##########
@@ -19,26 +19,201 @@
#include "iceberg/data/position_delete_writer.h"
+#include <map>
+#include <set>
+#include <vector>
+
+#include <nanoarrow/nanoarrow.h>
+
+#include "iceberg/arrow/nanoarrow_status_internal.h"
+#include "iceberg/file_writer.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/metadata_columns.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/util/macros.h"
+
namespace iceberg {
class PositionDeleteWriter::Impl {
public:
+ static Result<std::unique_ptr<Impl>> Make(PositionDeleteWriterOptions
options) {
+ // Build the position delete schema with file_path and pos columns
+ std::vector<SchemaField> fields;
+ fields.push_back(MetadataColumns::kDeleteFilePath);
+ fields.push_back(MetadataColumns::kDeleteFilePos);
+
+ auto delete_schema = std::make_shared<Schema>(std::move(fields));
+
+ WriterOptions writer_options{
+ .path = options.path,
+ .schema = delete_schema,
+ .io = options.io,
+ .properties = WriterProperties::FromMap(options.properties),
+ };
+
+ ICEBERG_ASSIGN_OR_RAISE(auto writer,
+ WriterFactoryRegistry::Open(options.format,
writer_options));
+
+ return std::unique_ptr<Impl>(
+ new Impl(std::move(options), std::move(delete_schema),
std::move(writer)));
+ }
+
+ Status Write(ArrowArray* data) {
+ ICEBERG_DCHECK(writer_, "Writer not initialized");
+ return writer_->Write(data);
+ }
+
+ Status WriteDelete(std::string_view file_path, int64_t pos) {
+ ICEBERG_DCHECK(writer_, "Writer not initialized");
+ buffered_paths_.emplace_back(file_path);
+ buffered_positions_.push_back(pos);
+ referenced_paths_.emplace(file_path);
+
+ if (static_cast<int64_t>(buffered_paths_.size()) >=
options_.flush_threshold) {
+ return FlushBuffer();
+ }
+ return {};
+ }
+
+ Result<int64_t> Length() const {
+ ICEBERG_DCHECK(writer_, "Writer not initialized");
+ return writer_->length();
+ }
+
+ Status Close() {
+ ICEBERG_DCHECK(writer_, "Writer not initialized");
+ if (closed_) {
+ return {};
+ }
+ if (!buffered_paths_.empty()) {
+ ICEBERG_RETURN_UNEXPECTED(FlushBuffer());
+ }
+ ICEBERG_RETURN_UNEXPECTED(writer_->Close());
+ closed_ = true;
+ return {};
+ }
+
+ Result<FileWriter::WriteResult> Metadata() {
+ ICEBERG_CHECK(closed_, "Cannot get metadata before closing the writer");
+
+ ICEBERG_ASSIGN_OR_RAISE(auto metrics, writer_->metrics());
+ ICEBERG_ASSIGN_OR_RAISE(auto length, writer_->length());
+ auto split_offsets = writer_->split_offsets();
+
+ // Serialize literal bounds to binary format
+ std::map<int32_t, std::vector<uint8_t>> lower_bounds_map;
+ for (const auto& [col_id, literal] : metrics.lower_bounds) {
+ ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
+ lower_bounds_map[col_id] = std::move(serialized);
+ }
+ std::map<int32_t, std::vector<uint8_t>> upper_bounds_map;
+ for (const auto& [col_id, literal] : metrics.upper_bounds) {
+ ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
+ upper_bounds_map[col_id] = std::move(serialized);
+ }
+
+ // Set referenced_data_file if all deletes reference the same data file
+ std::optional<std::string> referenced_data_file;
+ if (referenced_paths_.size() == 1) {
+ referenced_data_file = *referenced_paths_.begin();
+ }
+
+ auto data_file = std::make_shared<DataFile>(DataFile{
+ .content = DataFile::Content::kPositionDeletes,
+ .file_path = options_.path,
+ .file_format = options_.format,
+ .partition = options_.partition,
+ .record_count = metrics.row_count.value_or(-1),
+ .file_size_in_bytes = length,
+ .column_sizes = {metrics.column_sizes.begin(),
metrics.column_sizes.end()},
+ .value_counts = {metrics.value_counts.begin(),
metrics.value_counts.end()},
+ .null_value_counts = {metrics.null_value_counts.begin(),
+ metrics.null_value_counts.end()},
+ .nan_value_counts = {metrics.nan_value_counts.begin(),
+ metrics.nan_value_counts.end()},
+ .lower_bounds = std::move(lower_bounds_map),
+ .upper_bounds = std::move(upper_bounds_map),
+ .split_offsets = std::move(split_offsets),
+ .sort_order_id = std::nullopt,
+ .referenced_data_file = std::move(referenced_data_file),
+ });
+
+ FileWriter::WriteResult result;
+ result.data_files.push_back(std::move(data_file));
+ return result;
+ }
+
+ private:
+ Impl(PositionDeleteWriterOptions options, std::shared_ptr<Schema>
delete_schema,
+ std::unique_ptr<Writer> writer)
+ : options_(std::move(options)),
+ delete_schema_(std::move(delete_schema)),
+ writer_(std::move(writer)) {}
+
+ Status FlushBuffer() {
+ ArrowSchema arrow_schema;
+ ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*delete_schema_, &arrow_schema));
+
+ ArrowArray array;
+ ArrowError error;
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
+ ArrowArrayInitFromSchema(&array, &arrow_schema, &error), error);
Review Comment:
**Logic Issue (Memory Leak):**
`arrow_schema` and `array` are not protected by RAII wrappers. If any
`ICEBERG_NANOARROW_RETURN_UNEXPECTED` macro fails before
`arrow_schema.release(&arrow_schema)` or before `writer_->Write(&array)`
transfers ownership, the memory will leak.
*Fix:* Use `internal::ArrowSchemaGuard schema_guard(&arrow_schema);` and
`internal::ArrowArrayGuard array_guard(&array);` to safely manage their
lifecycles.
##########
src/iceberg/data/position_delete_writer.cc:
##########
@@ -19,26 +19,201 @@
#include "iceberg/data/position_delete_writer.h"
+#include <map>
+#include <set>
+#include <vector>
+
+#include <nanoarrow/nanoarrow.h>
+
+#include "iceberg/arrow/nanoarrow_status_internal.h"
+#include "iceberg/file_writer.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/metadata_columns.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/util/macros.h"
+
namespace iceberg {
class PositionDeleteWriter::Impl {
public:
+ static Result<std::unique_ptr<Impl>> Make(PositionDeleteWriterOptions
options) {
+ // Build the position delete schema with file_path and pos columns
+ std::vector<SchemaField> fields;
+ fields.push_back(MetadataColumns::kDeleteFilePath);
+ fields.push_back(MetadataColumns::kDeleteFilePos);
+
+ auto delete_schema = std::make_shared<Schema>(std::move(fields));
+
+ WriterOptions writer_options{
+ .path = options.path,
+ .schema = delete_schema,
+ .io = options.io,
+ .properties = WriterProperties::FromMap(options.properties),
+ };
+
+ ICEBERG_ASSIGN_OR_RAISE(auto writer,
+ WriterFactoryRegistry::Open(options.format,
writer_options));
+
+ return std::unique_ptr<Impl>(
+ new Impl(std::move(options), std::move(delete_schema),
std::move(writer)));
+ }
+
+ Status Write(ArrowArray* data) {
Review Comment:
**Parity / Logic Issue:**
If users write batches directly using `Write(ArrowArray* data)`, the
`referenced_paths_` set is never updated. As a result, `Metadata()` will fail
to correctly populate `referenced_data_file` when the batch contains deletes
for a single data file.
*Fix:* Add a `// TODO: Extract paths from ArrowArray to update
referenced_paths_` or return `NotImplemented` if batch writing should not be
directly used this way.
##########
src/iceberg/data/position_delete_writer.cc:
##########
@@ -19,26 +19,201 @@
#include "iceberg/data/position_delete_writer.h"
+#include <map>
+#include <set>
+#include <vector>
+
+#include <nanoarrow/nanoarrow.h>
+
+#include "iceberg/arrow/nanoarrow_status_internal.h"
+#include "iceberg/file_writer.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/metadata_columns.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/util/macros.h"
+
namespace iceberg {
class PositionDeleteWriter::Impl {
public:
+ static Result<std::unique_ptr<Impl>> Make(PositionDeleteWriterOptions
options) {
+ // Build the position delete schema with file_path and pos columns
Review Comment:
**Parity Issue (Missing Implementation):**
`PositionDeleteWriterOptions` accepts a `row_schema`, but `Impl::Make`
completely ignores it, creating a schema with only `file_path` and `pos`. The
V2 spec allows position deletes to optionally include the deleted row.
*Fix:* Add a `// TODO: Support writing row data if options.row_schema is
provided`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]