github-actions[bot] commented on code in PR #60482:
URL: https://github.com/apache/doris/pull/60482#discussion_r2977675016


##########
be/src/format/transformer/merge_partitioner.cpp:
##########
@@ -0,0 +1,365 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "format/transformer/merge_partitioner.h"
+
+#include <algorithm>
+#include <cstdint>
+
+#include "common/cast_set.h"
+#include "common/config.h"
+#include "common/logging.h"
+#include "common/status.h"
+#include "core/block/block.h"
+#include "core/column/column_const.h"
+#include "core/column/column_nullable.h"
+#include "core/column/column_vector.h"
+#include "format/transformer/iceberg_partition_function.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+
+namespace {
+constexpr int8_t kInsertOperation = 1;
+constexpr int8_t kDeleteOperation = 2;
+constexpr int8_t kUpdateOperation = 3;
+constexpr int8_t kUpdateInsertOperation = 4;
+constexpr int8_t kUpdateDeleteOperation = 5;
+
+int64_t scale_threshold_by_task(int64_t value, int task_num) {
+    if (task_num <= 0) {
+        return value;
+    }
+    int64_t scaled = value / task_num;
+    return scaled == 0 ? value : scaled;
+}
+} // namespace
+
+MergePartitioner::MergePartitioner(size_t partition_count, const 
TMergePartitionInfo& merge_info,
+                                   bool use_new_shuffle_hash_method)
+        : PartitionerBase(static_cast<HashValType>(partition_count)),
+          _merge_info(merge_info),
+          _use_new_shuffle_hash_method(use_new_shuffle_hash_method),
+          _insert_random(merge_info.insert_random) {}
+
+Status MergePartitioner::init(const std::vector<TExpr>& /*texprs*/) {
+    VExprContextSPtr op_ctx;
+    RETURN_IF_ERROR(VExpr::create_expr_tree(_merge_info.operation_expr, 
op_ctx));
+    _operation_expr_ctxs.emplace_back(std::move(op_ctx));
+
+    std::vector<TExpr> insert_exprs;
+    std::vector<TIcebergPartitionField> insert_fields;
+    if (_merge_info.__isset.insert_partition_exprs) {
+        insert_exprs = _merge_info.insert_partition_exprs;
+    }
+    if (_merge_info.__isset.insert_partition_fields) {
+        insert_fields = _merge_info.insert_partition_fields;
+    }
+    if (!insert_exprs.empty() || !insert_fields.empty()) {
+        _insert_partition_function = 
std::make_unique<IcebergInsertPartitionFunction>(
+                _partition_count, _hash_method(), std::move(insert_exprs),
+                std::move(insert_fields));
+        RETURN_IF_ERROR(_insert_partition_function->init({}));
+    }
+
+    if (_merge_info.__isset.delete_partition_exprs && 
!_merge_info.delete_partition_exprs.empty()) {
+        _delete_partition_function = 
std::make_unique<IcebergDeletePartitionFunction>(
+                _partition_count, _hash_method(), 
_merge_info.delete_partition_exprs);
+        RETURN_IF_ERROR(_delete_partition_function->init({}));
+    }
+    return Status::OK();
+}
+
+Status MergePartitioner::prepare(RuntimeState* state, const RowDescriptor& 
row_desc) {
+    RETURN_IF_ERROR(VExpr::prepare(_operation_expr_ctxs, state, row_desc));
+    if (_insert_partition_function != nullptr) {
+        RETURN_IF_ERROR(_insert_partition_function->prepare(state, row_desc));
+    }
+    if (_delete_partition_function != nullptr) {
+        RETURN_IF_ERROR(_delete_partition_function->prepare(state, row_desc));
+    }
+    return Status::OK();
+}
+
+Status MergePartitioner::open(RuntimeState* state) {
+    RETURN_IF_ERROR(VExpr::open(_operation_expr_ctxs, state));
+    if (_insert_partition_function != nullptr) {
+        RETURN_IF_ERROR(_insert_partition_function->open(state));
+        if (auto* insert_function =
+                    
dynamic_cast<IcebergInsertPartitionFunction*>(_insert_partition_function.get());
+            insert_function != nullptr && 
insert_function->fallback_to_random()) {
+            _insert_random = true;
+        }
+    }
+    if (_delete_partition_function != nullptr) {
+        RETURN_IF_ERROR(_delete_partition_function->open(state));
+    }
+    _init_insert_scaling(state);
+    return Status::OK();
+}
+
+Status MergePartitioner::close(RuntimeState* /*state*/) {
+    return Status::OK();
+}
+
+Status MergePartitioner::do_partitioning(RuntimeState* state, Block* block) 
const {
+    const size_t rows = block->rows();
+    if (rows == 0) {
+        _channel_ids.clear();
+        return Status::OK();
+    }
+
+    const size_t column_to_keep = block->columns();
+    if (_operation_expr_ctxs.empty()) {
+        return Status::InternalError("Merge partitioning missing operation 
expression");
+    }
+
+    int op_idx = -1;
+    RETURN_IF_ERROR(_operation_expr_ctxs[0]->execute(block, &op_idx));
+    if (op_idx < 0 || op_idx >= block->columns()) {
+        return Status::InternalError("Merge partitioning missing operation 
column");
+    }
+    if (op_idx >= cast_set<int>(column_to_keep)) {
+        return Status::InternalError("Merge partitioning requires operation 
column in input block");
+    }
+
+    const auto& op_column = block->get_by_position(op_idx).column;
+    const auto* op_data = remove_nullable(op_column).get();
+    std::vector<int8_t> ops(rows);
+    bool has_insert = false;
+    bool has_delete = false;
+    bool has_update = false;
+    for (size_t i = 0; i < rows; ++i) {
+        int8_t op = static_cast<int8_t>(op_data->get_int(i));
+        ops[i] = op;
+        if (_is_insert_op(op)) {
+            has_insert = true;
+        }
+        if (_is_delete_op(op)) {
+            has_delete = true;
+        }
+        if (op == kUpdateOperation) {
+            has_update = true;
+        }
+    }
+
+    if (has_insert && !_insert_random && _insert_partition_function == 
nullptr) {
+        return Status::InternalError("Merge partitioning insert exprs are 
empty");
+    }
+    if (has_delete && _delete_partition_function == nullptr) {
+        return Status::InternalError("Merge partitioning delete exprs are 
empty");
+    }
+
+    std::vector<uint32_t> insert_hashes;
+    std::vector<uint32_t> delete_hashes;
+    const size_t insert_partition_count =
+            _enable_insert_rebalance ? _insert_partition_count : 
_partition_count;
+    if (has_insert && !_insert_random) {
+        RETURN_IF_ERROR(_insert_partition_function->get_partitions(
+                state, block, insert_partition_count, insert_hashes));
+    }
+    if (has_delete) {
+        RETURN_IF_ERROR(_delete_partition_function->get_partitions(state, 
block, _partition_count,
+                                                                   
delete_hashes));
+    }
+    if (has_insert) {
+        if (_insert_random) {
+            if (_non_partition_scaling_threshold > 0) {
+                _insert_data_processed += static_cast<int64_t>(block->bytes());
+                if (_insert_writer_count < static_cast<int>(_partition_count) 
&&
+                    _insert_data_processed >=
+                            _insert_writer_count * 
_non_partition_scaling_threshold) {
+                    _insert_writer_count++;
+                }
+            } else {
+                _insert_writer_count = static_cast<int>(_partition_count);
+            }
+        } else if (_enable_insert_rebalance) {
+            _apply_insert_rebalance(ops, insert_hashes, block->bytes());
+        }
+    }
+
+    Block::erase_useless_column(block, column_to_keep);
+
+    _channel_ids.resize(rows);
+    for (size_t i = 0; i < rows; ++i) {
+        const int8_t op = ops[i];
+        if (op == kUpdateOperation) {
+            _channel_ids[i] = delete_hashes[i];
+            continue;
+        }
+        if (_is_insert_op(op)) {
+            _channel_ids[i] = _insert_random ? _next_rr_channel() : 
insert_hashes[i];
+        } else if (_is_delete_op(op)) {
+            _channel_ids[i] = delete_hashes[i];
+        } else {
+            return Status::InternalError("Unknown Iceberg merge operation {}", 
op);
+        }
+    }
+
+    if (has_update) {
+        for (size_t col_idx = 0; col_idx < block->columns(); ++col_idx) {
+            block->replace_by_position_if_const(col_idx);
+        }
+
+        MutableColumns mutable_columns = block->mutate_columns();
+        MutableColumnPtr& op_mut = mutable_columns[op_idx];
+        ColumnInt8* op_values_col = nullptr;
+        if (auto* nullable_col = 
check_and_get_column<ColumnNullable>(op_mut.get())) {
+            op_values_col =
+                    
check_and_get_column<ColumnInt8>(nullable_col->get_nested_column_ptr().get());
+        } else {
+            op_values_col = check_and_get_column<ColumnInt8>(op_mut.get());
+        }
+        if (op_values_col == nullptr) {
+            block->set_columns(std::move(mutable_columns));
+            return Status::InternalError("Merge operation column must be 
tinyint");
+        }
+        auto& op_values = op_values_col->get_data();
+        const size_t original_rows = rows;
+        for (size_t row = 0; row < original_rows; ++row) {
+            if (ops[row] != kUpdateOperation) {
+                continue;
+            }
+            op_values[row] = kUpdateDeleteOperation;
+            for (size_t col_idx = 0; col_idx < mutable_columns.size(); 
++col_idx) {
+                
mutable_columns[col_idx]->insert_from(*mutable_columns[col_idx], row);
+            }

Review Comment:
   **CRITICAL: Use-after-free (self-referential `insert_from`)**
   
   When `op == kUpdateOperation`, this line duplicates a row from 
`mutable_columns[col_idx]` back into the **same** column:
   ```cpp
   mutable_columns[col_idx]->insert_from(*mutable_columns[col_idx], row);
   ```
   
   For `ColumnString`, `insert_from` calls `chars.resize(new_size)` which can 
reallocate the internal buffer, then reads from `src.chars[offset]` — but `src` 
IS `*this`, so the source pointer is now dangling. This is a classic 
self-referential container insertion bug that causes use-after-free / data 
corruption.
   
   **Fix**: Clone each row's data into a temporary `Block` or row-holder first, 
then append from the temporary. Alternatively, collect all update rows' indices 
first, then bulk-duplicate them from a snapshot of the original columns.



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java:
##########
@@ -226,6 +249,210 @@ private void updateManifestAfterRewrite() {
         }
     }
 
+    /**
+     * Begin delete operation for Iceberg table
+     */
+    public void beginDelete(ExternalTable dorisTable) throws UserException {
+        try {
+            ops.getExecutionAuthenticator().execute(() -> {
+                // create and start the iceberg transaction
+                this.table = IcebergUtils.getIcebergTable(dorisTable);
+                this.baseSnapshotId = getSnapshotIdIfPresent(table);
+                // // Verify table format version (must be v2+ for delete 
support)
+                // String formatVersionStr = 
table.properties().get("format-version");

Review Comment:
   **HIGH: Format version v2 check is commented out**
   
   Position delete files require Iceberg table format version >= 2. This 
validation is commented out. Operating against a v1 table will lead to runtime 
errors from the Iceberg library when attempting to create position delete 
files, or potentially silent data corruption.
   
   This check should be active, or at minimum there should be a clear comment 
explaining why it's deferred (e.g., if the Iceberg library itself validates 
this).



##########
be/src/format/transformer/iceberg_partition_function.cpp:
##########
@@ -0,0 +1,395 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "format/transformer/iceberg_partition_function.h"
+
+#include "common/cast_set.h"
+#include "common/exception.h"
+#include "common/logging.h"
+#include "common/status.h"
+#include "core/column/column_const.h"
+#include "core/column/column_nullable.h"
+#include "core/column/column_struct.h"
+#include "core/data_type/data_type_struct.h"
+#include "exec/sink/writer/iceberg/partition_transformers.h"
+#include "format/table/iceberg/partition_spec.h"
+#include "util/string_util.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+
+using HashValType = PartitionerBase::HashValType;
+
+static void initialize_shuffle_hashes(std::vector<HashValType>& hashes, size_t 
rows,
+                                      ShuffleHashMethod method) {
+    hashes.resize(rows);
+    if (method == ShuffleHashMethod::CRC32C) {
+        constexpr HashValType CRC32C_SHUFFLE_SEED = 0x9E3779B9U;
+        std::fill(hashes.begin(), hashes.end(), CRC32C_SHUFFLE_SEED);
+    } else {
+        std::fill(hashes.begin(), hashes.end(), 0);
+    }
+}
+
+static void update_shuffle_hashes(const ColumnPtr& column, const DataTypePtr& 
type,
+                                  HashValType* __restrict result, 
ShuffleHashMethod method) {
+    if (method == ShuffleHashMethod::CRC32C) {
+        column->update_crc32c_batch(result, nullptr);
+    } else {
+        column->update_crcs_with_value(result, type->get_primitive_type(),
+                                       cast_set<HashValType>(column->size()));
+    }
+}
+
+static void apply_shuffle_channel_ids(std::vector<HashValType>& hashes, size_t 
partition_count,
+                                      ShuffleHashMethod method) {
+    for (auto& h : hashes) {
+        if (method == ShuffleHashMethod::CRC32C) {
+            h = crc32c_shuffle_mix(h) % partition_count;
+        } else {
+            h = h % partition_count;
+        }
+    }
+}
+
+IcebergInsertPartitionFunction::IcebergInsertPartitionFunction(
+        HashValType partition_count, ShuffleHashMethod hash_method,
+        std::vector<TExpr> partition_exprs, 
std::vector<TIcebergPartitionField> partition_fields)
+        : _partition_count(partition_count),
+          _hash_method(hash_method),
+          _partition_exprs(std::move(partition_exprs)),
+          _partition_fields_spec(std::move(partition_fields)) {}
+
+Status IcebergInsertPartitionFunction::init(const std::vector<TExpr>& texprs) {
+    const auto& exprs = _partition_exprs.empty() ? texprs : _partition_exprs;
+    if (!exprs.empty()) {
+        RETURN_IF_ERROR(VExpr::create_expr_trees(exprs, _partition_expr_ctxs));
+    }
+    if (!_partition_fields_spec.empty()) {
+        _partition_fields.reserve(_partition_fields_spec.size());
+        for (const auto& field : _partition_fields_spec) {
+            VExprContextSPtr ctx;
+            RETURN_IF_ERROR(VExpr::create_expr_tree(field.source_expr, ctx));
+            InsertPartitionField insert_field;
+            insert_field.transform = field.transform;
+            insert_field.expr_ctx = std::move(ctx);
+            insert_field.source_id = field.__isset.source_id ? field.source_id 
: 0;
+            insert_field.name = field.__isset.name ? field.name : "";
+            _partition_fields.emplace_back(std::move(insert_field));
+        }
+    }
+    return Status::OK();
+}
+
+Status IcebergInsertPartitionFunction::prepare(RuntimeState* state, const 
RowDescriptor& row_desc) {
+    RETURN_IF_ERROR(VExpr::prepare(_partition_expr_ctxs, state, row_desc));
+    if (!_partition_fields.empty()) {
+        VExprContextSPtrs field_ctxs;
+        field_ctxs.reserve(_partition_fields.size());
+        for (const auto& field : _partition_fields) {
+            field_ctxs.emplace_back(field.expr_ctx);
+        }
+        RETURN_IF_ERROR(VExpr::prepare(field_ctxs, state, row_desc));
+    }
+    return Status::OK();
+}
+
+Status IcebergInsertPartitionFunction::open(RuntimeState* state) {
+    RETURN_IF_ERROR(VExpr::open(_partition_expr_ctxs, state));
+    if (!_partition_fields.empty()) {
+        VExprContextSPtrs field_ctxs;
+        field_ctxs.reserve(_partition_fields.size());
+        for (const auto& field : _partition_fields) {
+            field_ctxs.emplace_back(field.expr_ctx);
+        }
+        RETURN_IF_ERROR(VExpr::open(field_ctxs, state));
+        for (auto& field : _partition_fields) {
+            try {
+                doris::iceberg::PartitionField 
partition_field(field.source_id, 0, field.name,
+                                                               
field.transform);
+                field.transformer = PartitionColumnTransforms::create(
+                        partition_field, field.expr_ctx->root()->data_type());
+            } catch (const doris::Exception& e) {
+                LOG(WARNING) << "Merge partitioning fallback to RR: " << 
e.what();
+                _fallback_to_random = true;
+                _partition_fields.clear();
+                break;
+            }
+        }
+    }
+    return Status::OK();
+}
+
+Status IcebergInsertPartitionFunction::get_partitions(RuntimeState* /*state*/, 
Block* block,
+                                                      size_t partition_count,
+                                                      
std::vector<HashValType>& partitions) const {
+    if (_fallback_to_random) {
+        return Status::InternalError("Merge partitioning fallback to random");
+    }
+    if (partition_count == 0) {
+        return Status::InternalError("Partition count is zero");
+    }
+    if (!_partition_fields.empty()) {
+        RETURN_IF_ERROR(_compute_hashes_with_transform(block, partitions));
+    } else {
+        RETURN_IF_ERROR(_compute_hashes_with_exprs(block, partitions));
+    }
+    apply_shuffle_channel_ids(partitions, partition_count, _hash_method);
+    return Status::OK();
+}
+
+Status IcebergInsertPartitionFunction::clone(RuntimeState* state,
+                                             
std::unique_ptr<PartitionFunction>& function) const {
+    auto* new_function = new IcebergInsertPartitionFunction(
+            _partition_count, _hash_method, _partition_exprs, 
_partition_fields_spec);
+    function.reset(new_function);
+    RETURN_IF_ERROR(
+            _clone_expr_ctxs(state, _partition_expr_ctxs, 
new_function->_partition_expr_ctxs));
+    if (!_partition_fields.empty()) {
+        VExprContextSPtrs src_field_ctxs;
+        src_field_ctxs.reserve(_partition_fields.size());
+        for (const auto& field : _partition_fields) {
+            src_field_ctxs.emplace_back(field.expr_ctx);
+        }
+        VExprContextSPtrs dst_field_ctxs;
+        RETURN_IF_ERROR(_clone_expr_ctxs(state, src_field_ctxs, 
dst_field_ctxs));
+        new_function->_partition_fields.reserve(dst_field_ctxs.size());
+        for (size_t i = 0; i < dst_field_ctxs.size(); ++i) {
+            InsertPartitionField field;
+            field.transform = _partition_fields[i].transform;
+            field.expr_ctx = dst_field_ctxs[i];
+            field.source_id = _partition_fields[i].source_id;
+            field.name = _partition_fields[i].name;
+            new_function->_partition_fields.emplace_back(std::move(field));
+        }
+    }
+    new_function->_fallback_to_random = _fallback_to_random;
+    return Status::OK();
+}
+
+Status IcebergInsertPartitionFunction::_compute_hashes_with_transform(
+        Block* block, std::vector<HashValType>& partitions) const {
+    const size_t rows = block->rows();
+    if (rows == 0) {
+        partitions.clear();
+        return Status::OK();
+    }
+    if (_partition_fields.empty()) {
+        return Status::InternalError("Merge partitioning insert fields are 
empty");
+    }
+
+    std::vector<int> results(_partition_fields.size());
+    for (size_t i = 0; i < _partition_fields.size(); ++i) {
+        RETURN_IF_ERROR(_partition_fields[i].expr_ctx->execute(block, 
&results[i]));
+    }
+
+    initialize_shuffle_hashes(partitions, rows, _hash_method);
+    auto* __restrict hash_values = partitions.data();
+    for (size_t i = 0; i < _partition_fields.size(); ++i) {
+        if (_partition_fields[i].transformer == nullptr) {
+            return Status::InternalError("Merge partitioning transform is not 
initialized");
+        }
+        ColumnWithTypeAndName transformed =
+                _partition_fields[i].transformer->apply(*block, results[i]);
+        const auto& [column, is_const] = unpack_if_const(transformed.column);
+        if (is_const) {
+            continue;

Review Comment:
   **MEDIUM: Const columns skipped in hash computation**
   
   When a partition column is const (same value for all rows in a block), its 
hash contribution is skipped entirely (`continue`). If different blocks have 
different const values for the same partition column, rows that should go to 
different partitions will hash identically, causing incorrect data routing.
   
   For example: block A has `partition_col = 'a'` (const), block B has 
`partition_col = 'b'` (const). Both blocks will produce the same hash values 
since the partition column's contribution is skipped.
   
   Consider materializing const columns before hashing, or computing the const 
value's hash contribution once and applying it to all rows.



##########
be/src/format/transformer/merge_partitioner.cpp:
##########
@@ -0,0 +1,365 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "format/transformer/merge_partitioner.h"
+
+#include <algorithm>
+#include <cstdint>
+
+#include "common/cast_set.h"
+#include "common/config.h"
+#include "common/logging.h"
+#include "common/status.h"
+#include "core/block/block.h"
+#include "core/column/column_const.h"
+#include "core/column/column_nullable.h"
+#include "core/column/column_vector.h"
+#include "format/transformer/iceberg_partition_function.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+
+namespace {
+constexpr int8_t kInsertOperation = 1;
+constexpr int8_t kDeleteOperation = 2;
+constexpr int8_t kUpdateOperation = 3;
+constexpr int8_t kUpdateInsertOperation = 4;
+constexpr int8_t kUpdateDeleteOperation = 5;
+
+int64_t scale_threshold_by_task(int64_t value, int task_num) {
+    if (task_num <= 0) {
+        return value;
+    }
+    int64_t scaled = value / task_num;
+    return scaled == 0 ? value : scaled;
+}
+} // namespace
+
+MergePartitioner::MergePartitioner(size_t partition_count, const 
TMergePartitionInfo& merge_info,
+                                   bool use_new_shuffle_hash_method)
+        : PartitionerBase(static_cast<HashValType>(partition_count)),
+          _merge_info(merge_info),
+          _use_new_shuffle_hash_method(use_new_shuffle_hash_method),
+          _insert_random(merge_info.insert_random) {}
+
+Status MergePartitioner::init(const std::vector<TExpr>& /*texprs*/) {
+    VExprContextSPtr op_ctx;
+    RETURN_IF_ERROR(VExpr::create_expr_tree(_merge_info.operation_expr, 
op_ctx));
+    _operation_expr_ctxs.emplace_back(std::move(op_ctx));
+
+    std::vector<TExpr> insert_exprs;
+    std::vector<TIcebergPartitionField> insert_fields;
+    if (_merge_info.__isset.insert_partition_exprs) {
+        insert_exprs = _merge_info.insert_partition_exprs;
+    }
+    if (_merge_info.__isset.insert_partition_fields) {
+        insert_fields = _merge_info.insert_partition_fields;
+    }
+    if (!insert_exprs.empty() || !insert_fields.empty()) {
+        _insert_partition_function = 
std::make_unique<IcebergInsertPartitionFunction>(
+                _partition_count, _hash_method(), std::move(insert_exprs),
+                std::move(insert_fields));
+        RETURN_IF_ERROR(_insert_partition_function->init({}));
+    }
+
+    if (_merge_info.__isset.delete_partition_exprs && 
!_merge_info.delete_partition_exprs.empty()) {
+        _delete_partition_function = 
std::make_unique<IcebergDeletePartitionFunction>(
+                _partition_count, _hash_method(), 
_merge_info.delete_partition_exprs);
+        RETURN_IF_ERROR(_delete_partition_function->init({}));
+    }
+    return Status::OK();
+}
+
+Status MergePartitioner::prepare(RuntimeState* state, const RowDescriptor& 
row_desc) {
+    RETURN_IF_ERROR(VExpr::prepare(_operation_expr_ctxs, state, row_desc));
+    if (_insert_partition_function != nullptr) {
+        RETURN_IF_ERROR(_insert_partition_function->prepare(state, row_desc));
+    }
+    if (_delete_partition_function != nullptr) {
+        RETURN_IF_ERROR(_delete_partition_function->prepare(state, row_desc));
+    }
+    return Status::OK();
+}
+
+Status MergePartitioner::open(RuntimeState* state) {
+    RETURN_IF_ERROR(VExpr::open(_operation_expr_ctxs, state));
+    if (_insert_partition_function != nullptr) {
+        RETURN_IF_ERROR(_insert_partition_function->open(state));
+        if (auto* insert_function =
+                    
dynamic_cast<IcebergInsertPartitionFunction*>(_insert_partition_function.get());
+            insert_function != nullptr && 
insert_function->fallback_to_random()) {
+            _insert_random = true;
+        }
+    }
+    if (_delete_partition_function != nullptr) {
+        RETURN_IF_ERROR(_delete_partition_function->open(state));
+    }
+    _init_insert_scaling(state);
+    return Status::OK();
+}
+
+Status MergePartitioner::close(RuntimeState* /*state*/) {
+    return Status::OK();
+}
+
+Status MergePartitioner::do_partitioning(RuntimeState* state, Block* block) 
const {
+    const size_t rows = block->rows();
+    if (rows == 0) {
+        _channel_ids.clear();
+        return Status::OK();
+    }
+
+    const size_t column_to_keep = block->columns();
+    if (_operation_expr_ctxs.empty()) {
+        return Status::InternalError("Merge partitioning missing operation 
expression");
+    }
+
+    int op_idx = -1;
+    RETURN_IF_ERROR(_operation_expr_ctxs[0]->execute(block, &op_idx));
+    if (op_idx < 0 || op_idx >= block->columns()) {
+        return Status::InternalError("Merge partitioning missing operation 
column");
+    }
+    if (op_idx >= cast_set<int>(column_to_keep)) {
+        return Status::InternalError("Merge partitioning requires operation 
column in input block");
+    }
+
+    const auto& op_column = block->get_by_position(op_idx).column;
+    const auto* op_data = remove_nullable(op_column).get();
+    std::vector<int8_t> ops(rows);
+    bool has_insert = false;
+    bool has_delete = false;
+    bool has_update = false;
+    for (size_t i = 0; i < rows; ++i) {
+        int8_t op = static_cast<int8_t>(op_data->get_int(i));
+        ops[i] = op;
+        if (_is_insert_op(op)) {
+            has_insert = true;
+        }
+        if (_is_delete_op(op)) {
+            has_delete = true;
+        }
+        if (op == kUpdateOperation) {
+            has_update = true;
+        }
+    }
+
+    if (has_insert && !_insert_random && _insert_partition_function == 
nullptr) {
+        return Status::InternalError("Merge partitioning insert exprs are 
empty");
+    }
+    if (has_delete && _delete_partition_function == nullptr) {
+        return Status::InternalError("Merge partitioning delete exprs are 
empty");
+    }
+
+    std::vector<uint32_t> insert_hashes;
+    std::vector<uint32_t> delete_hashes;
+    const size_t insert_partition_count =
+            _enable_insert_rebalance ? _insert_partition_count : 
_partition_count;
+    if (has_insert && !_insert_random) {
+        RETURN_IF_ERROR(_insert_partition_function->get_partitions(
+                state, block, insert_partition_count, insert_hashes));
+    }
+    if (has_delete) {
+        RETURN_IF_ERROR(_delete_partition_function->get_partitions(state, 
block, _partition_count,
+                                                                   
delete_hashes));
+    }
+    if (has_insert) {
+        if (_insert_random) {
+            if (_non_partition_scaling_threshold > 0) {
+                _insert_data_processed += static_cast<int64_t>(block->bytes());
+                if (_insert_writer_count < static_cast<int>(_partition_count) 
&&
+                    _insert_data_processed >=
+                            _insert_writer_count * 
_non_partition_scaling_threshold) {
+                    _insert_writer_count++;
+                }
+            } else {
+                _insert_writer_count = static_cast<int>(_partition_count);
+            }
+        } else if (_enable_insert_rebalance) {
+            _apply_insert_rebalance(ops, insert_hashes, block->bytes());
+        }
+    }
+
+    Block::erase_useless_column(block, column_to_keep);
+
+    _channel_ids.resize(rows);
+    for (size_t i = 0; i < rows; ++i) {
+        const int8_t op = ops[i];
+        if (op == kUpdateOperation) {
+            _channel_ids[i] = delete_hashes[i];
+            continue;
+        }
+        if (_is_insert_op(op)) {
+            _channel_ids[i] = _insert_random ? _next_rr_channel() : 
insert_hashes[i];
+        } else if (_is_delete_op(op)) {
+            _channel_ids[i] = delete_hashes[i];
+        } else {
+            return Status::InternalError("Unknown Iceberg merge operation {}", 
op);
+        }
+    }
+
+    if (has_update) {
+        for (size_t col_idx = 0; col_idx < block->columns(); ++col_idx) {
+            block->replace_by_position_if_const(col_idx);
+        }
+
+        MutableColumns mutable_columns = block->mutate_columns();
+        MutableColumnPtr& op_mut = mutable_columns[op_idx];
+        ColumnInt8* op_values_col = nullptr;
+        if (auto* nullable_col = 
check_and_get_column<ColumnNullable>(op_mut.get())) {
+            op_values_col =
+                    
check_and_get_column<ColumnInt8>(nullable_col->get_nested_column_ptr().get());
+        } else {
+            op_values_col = check_and_get_column<ColumnInt8>(op_mut.get());
+        }
+        if (op_values_col == nullptr) {
+            block->set_columns(std::move(mutable_columns));
+            return Status::InternalError("Merge operation column must be 
tinyint");
+        }
+        auto& op_values = op_values_col->get_data();
+        const size_t original_rows = rows;
+        for (size_t row = 0; row < original_rows; ++row) {
+            if (ops[row] != kUpdateOperation) {
+                continue;
+            }
+            op_values[row] = kUpdateDeleteOperation;
+            for (size_t col_idx = 0; col_idx < mutable_columns.size(); 
++col_idx) {
+                
mutable_columns[col_idx]->insert_from(*mutable_columns[col_idx], row);
+            }
+            const size_t new_row_idx = op_values.size() - 1;
+            op_values[new_row_idx] = kUpdateInsertOperation;
+            const uint32_t insert_channel =
+                    _insert_random ? _next_rr_channel() : insert_hashes[row];
+            _channel_ids.push_back(insert_channel);

Review Comment:
   **CRITICAL: Out-of-bounds vector access when `_insert_random` is true**
   
   `insert_hashes` is only populated when `has_insert && !_insert_random` (line 
171). However, when `op == kUpdateOperation`, this line accesses 
`insert_hashes[row]` unconditionally via the ternary's false branch. If 
`_insert_random` is true, `insert_hashes` is empty, causing undefined behavior 
(out-of-bounds read).
   
   While the ternary condition `_insert_random ? _next_rr_channel() : 
insert_hashes[row]` appears to short-circuit, C++ does NOT short-circuit 
ternary operands — both branches may be evaluated depending on compiler 
optimization. Actually, the ternary DOES short-circuit (only the selected 
branch is evaluated), but this relies on the compiler not speculatively 
evaluating the false branch. The real concern is: if `_insert_random` can ever 
be false for some rows but true for the function as a whole, or if future 
refactoring changes the branching logic.
   
   **Correction**: On re-analysis, the C++ ternary operator does guarantee only 
one branch is evaluated. The condition `_insert_random` is a member variable 
that doesn't change during the loop. So if `_insert_random` is true, 
`insert_hashes[row]` is never evaluated. This is technically safe but fragile — 
consider guarding with a `DCHECK(!_insert_random || insert_hashes.empty())` or 
populating `insert_hashes` unconditionally.



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java:
##########
@@ -294,7 +521,17 @@ public void rollback() {
     }
 
     public long getUpdateCnt() {
-        return 
commitDataList.stream().mapToLong(TIcebergCommitData::getRowCount).sum();
+        long dataRows = 0;
+        long deleteRows = 0;
+        for (TIcebergCommitData commitData : commitDataList) {
+            if (commitData.isSetFileContent()
+                    && commitData.getFileContent() == 
TFileContent.POSITION_DELETES) {
+                deleteRows += commitData.getRowCount();
+            } else {
+                dataRows += commitData.getRowCount();
+            }
+        }
+        return dataRows > 0 ? dataRows : deleteRows;
     }

Review Comment:
   **MEDIUM: `getUpdateCnt()` returns misleading row counts**
   
   For a MERGE/UPDATE that produces both data files (inserts) and delete files:
   ```java
   return dataRows > 0 ? dataRows : deleteRows;
   ```
   
   This reports only insert rows for mixed operations. For an UPDATE of 100 
rows (which produces 100 position deletes + 100 new inserts), the user sees 
"100 rows affected" but the 100 deletes are invisible. For a pure DELETE 
operation, only delete rows are reported. The semantics should be documented, 
or consider returning `Math.max(dataRows, deleteRows)` or a separate count for 
each operation type.



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java:
##########
@@ -226,6 +249,210 @@ private void updateManifestAfterRewrite() {

Review Comment:
   **HIGH: NullPointerException on empty Iceberg table**
   
   `table.currentSnapshot()` returns `null` for a newly created table with no 
data. This line will throw NPE:
   ```java
   this.startingSnapshotId = table.currentSnapshot().snapshotId();
   ```
   
   Note that `getSnapshotIdIfPresent()` (line 587) already handles this case 
correctly. Consider using:
   ```java
   this.startingSnapshotId = getSnapshotIdIfPresent(table);
   ```



##########
be/src/exec/sink/viceberg_delete_sink.cpp:
##########
@@ -0,0 +1,501 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/sink/viceberg_delete_sink.h"
+
+#include <fmt/format.h>
+
+#include "common/logging.h"
+#include "core/block/column_with_type_and_name.h"
+#include "core/column/column_nullable.h"
+#include "core/column/column_string.h"
+#include "core/column/column_struct.h"
+#include "core/column/column_vector.h"
+#include "core/data_type/data_type_factory.hpp"
+#include "core/data_type/data_type_nullable.h"
+#include "core/data_type/data_type_number.h"
+#include "core/data_type/data_type_string.h"
+#include "core/data_type/data_type_struct.h"
+#include "exprs/vexpr.h"
+#include "format/transformer/vfile_format_transformer.h"
+#include "runtime/runtime_state.h"
+#include "util/string_util.h"
+#include "util/uid_util.h"
+
+namespace doris {
+
+VIcebergDeleteSink::VIcebergDeleteSink(const TDataSink& t_sink,
+                                       const VExprContextSPtrs& output_exprs,
+                                       std::shared_ptr<Dependency> dep,
+                                       std::shared_ptr<Dependency> fin_dep)
+        : AsyncResultWriter(output_exprs, dep, fin_dep), _t_sink(t_sink) {
+    DCHECK(_t_sink.__isset.iceberg_delete_sink);
+}
+
+Status VIcebergDeleteSink::init_properties(ObjectPool* pool) {
+    const auto& delete_sink = _t_sink.iceberg_delete_sink;
+
+    _delete_type = delete_sink.delete_type;
+    if (_delete_type != TFileContent::POSITION_DELETES) {
+        return Status::NotSupported("Iceberg delete only supports position 
delete files");
+    }
+
+    // Get file format settings
+    if (delete_sink.__isset.file_format) {
+        _file_format_type = delete_sink.file_format;
+    }
+
+    if (delete_sink.__isset.compress_type) {
+        _compress_type = delete_sink.compress_type;
+    }
+
+    // Get output path and table location
+    if (delete_sink.__isset.output_path) {
+        _output_path = delete_sink.output_path;
+    }
+
+    if (delete_sink.__isset.table_location) {
+        _table_location = delete_sink.table_location;
+    }
+
+    // Get Hadoop configuration
+    if (delete_sink.__isset.hadoop_config) {
+        _hadoop_conf.insert(delete_sink.hadoop_config.begin(), 
delete_sink.hadoop_config.end());
+    }
+
+    if (delete_sink.__isset.file_type) {
+        _file_type = delete_sink.file_type;
+    }
+
+    if (delete_sink.__isset.broker_addresses) {
+        _broker_addresses.assign(delete_sink.broker_addresses.begin(),
+                                 delete_sink.broker_addresses.end());
+    }
+
+    // Get partition information
+    if (delete_sink.__isset.partition_spec_id) {
+        _partition_spec_id = delete_sink.partition_spec_id;
+    }
+
+    if (delete_sink.__isset.partition_data_json) {
+        _partition_data_json = delete_sink.partition_data_json;
+    }
+
+    return Status::OK();
+}
+
+Status VIcebergDeleteSink::open(RuntimeState* state, RuntimeProfile* profile) {
+    _state = state;
+
+    // Initialize counters
+    _written_rows_counter = ADD_COUNTER(profile, "RowsWritten", TUnit::UNIT);
+    _send_data_timer = ADD_TIMER(profile, "SendDataTime");
+    _write_delete_files_timer = ADD_TIMER(profile, "WriteDeleteFilesTime");
+    _delete_file_count_counter = ADD_COUNTER(profile, "DeleteFileCount", 
TUnit::UNIT);
+    _open_timer = ADD_TIMER(profile, "OpenTime");
+    _close_timer = ADD_TIMER(profile, "CloseTime");
+
+    SCOPED_TIMER(_open_timer);
+
+    RETURN_IF_ERROR(_init_position_delete_output_exprs());
+
+    LOG(INFO) << fmt::format("VIcebergDeleteSink opened: delete_type={}, 
output_path={}",
+                             to_string(_delete_type), _output_path);
+
+    return Status::OK();
+}
+
+Status VIcebergDeleteSink::write(RuntimeState* state, Block& block) {
+    SCOPED_TIMER(_send_data_timer);
+
+    if (block.rows() == 0) {
+        return Status::OK();
+    }
+
+    _row_count += block.rows();
+
+    if (_delete_type != TFileContent::POSITION_DELETES) {
+        return Status::NotSupported("Iceberg delete only supports position 
delete files");
+    }
+
+    // Extract $row_id column and group by file_path
+    RETURN_IF_ERROR(_collect_position_deletes(block, _file_deletions));
+
+    if (_written_rows_counter) {
+        COUNTER_UPDATE(_written_rows_counter, block.rows());
+    }
+
+    return Status::OK();
+}
+
+Status VIcebergDeleteSink::close(Status close_status) {
+    SCOPED_TIMER(_close_timer);
+
+    if (!close_status.ok()) {
+        LOG(WARNING) << fmt::format("VIcebergDeleteSink close with error: {}",
+                                    close_status.to_string());
+        return close_status;
+    }
+
+    if (_delete_type == TFileContent::POSITION_DELETES && 
!_file_deletions.empty()) {
+        SCOPED_TIMER(_write_delete_files_timer);
+        RETURN_IF_ERROR(_write_position_delete_files(_file_deletions));
+    }
+
+    // Update counters
+    if (_delete_file_count_counter) {
+        COUNTER_UPDATE(_delete_file_count_counter, _delete_file_count);
+    }
+
+    LOG(INFO) << fmt::format("VIcebergDeleteSink closed: rows={}, 
delete_files={}", _row_count,
+                             _delete_file_count);
+
+    if (_state != nullptr) {
+        for (const auto& commit_data : _commit_data_list) {
+            _state->add_iceberg_commit_datas(commit_data);
+        }
+    }
+
+    return Status::OK();
+}
+
+int VIcebergDeleteSink::_get_row_id_column_index(const Block& block) {
+    // Find __DORIS_ICEBERG_ROWID_COL__ column in block
+    for (size_t i = 0; i < block.columns(); ++i) {
+        const auto& col_name = block.get_by_position(i).name;
+        if (col_name == doris::BeConsts::ICEBERG_ROWID_COL) {
+            return static_cast<int>(i);
+        }
+    }
+    return -1;
+}
+
+Status VIcebergDeleteSink::_collect_position_deletes(
+        const Block& block, std::map<std::string, IcebergFileDeletion>& 
file_deletions) {
+    // Find row id column
+    int row_id_col_idx = _get_row_id_column_index(block);
+    if (row_id_col_idx < 0) {
+        return Status::InternalError(
+                "__DORIS_ICEBERG_ROWID_COL__ column not found in block for 
position delete");
+    }
+
+    const auto& row_id_col = block.get_by_position(row_id_col_idx);
+    const IColumn* row_id_data = row_id_col.column.get();
+    const IDataType* row_id_type = row_id_col.type.get();
+    const auto* nullable_col = 
check_and_get_column<ColumnNullable>(row_id_data);
+    if (nullable_col != nullptr) {
+        row_id_data = nullable_col->get_nested_column_ptr().get();
+    }
+    const auto* nullable_type = 
check_and_get_data_type<DataTypeNullable>(row_id_type);
+    if (nullable_type != nullptr) {
+        row_id_type = nullable_type->get_nested_type().get();
+    }
+    const auto* struct_col = check_and_get_column<ColumnStruct>(row_id_data);
+    const auto* struct_type = 
check_and_get_data_type<DataTypeStruct>(row_id_type);
+    if (!struct_col || !struct_type) {
+        return Status::InternalError("__DORIS_ICEBERG_ROWID_COL__ column is 
not a struct column");
+    }
+
+    // __DORIS_ICEBERG_ROWID_COL__ struct:
+    // (file_path: STRING, row_position: BIGINT, partition_spec_id: INT, 
partition_data: STRING)
+    size_t field_count = struct_col->tuple_size();
+    if (field_count < 2) {
+        return Status::InternalError(
+                "__DORIS_ICEBERG_ROWID_COL__ struct must have at least 2 
fields "
+                "(file_path, row_position)");
+    }
+
+    auto normalize = [](const std::string& name) { return 
doris::to_lower(name); };
+
+    int file_path_idx = -1;
+    int row_position_idx = -1;
+    int spec_id_idx = -1;
+    int partition_data_idx = -1;
+    const auto& field_names = struct_type->get_element_names();
+    for (size_t i = 0; i < field_names.size(); ++i) {
+        std::string name = normalize(field_names[i]);
+        if (file_path_idx < 0 && name == "file_path") {
+            file_path_idx = static_cast<int>(i);
+        } else if (row_position_idx < 0 && name == "row_position") {
+            row_position_idx = static_cast<int>(i);
+        } else if (spec_id_idx < 0 && name == "partition_spec_id") {
+            spec_id_idx = static_cast<int>(i);
+        } else if (partition_data_idx < 0 && name == "partition_data") {
+            partition_data_idx = static_cast<int>(i);
+        }
+    }
+
+    if (file_path_idx < 0 || row_position_idx < 0) {
+        return Status::InternalError(
+                "__DORIS_ICEBERG_ROWID_COL__ must contain standard fields 
file_path and "
+                "row_position");
+    }
+    if (field_count >= 3 && spec_id_idx < 0) {
+        return Status::InternalError(
+                "__DORIS_ICEBERG_ROWID_COL__ must use standard field name 
partition_spec_id");
+    }
+    if (field_count >= 4 && partition_data_idx < 0) {
+        return Status::InternalError(
+                "__DORIS_ICEBERG_ROWID_COL__ must use standard field name 
partition_data");
+    }
+
+    const auto* file_path_col = check_and_get_column<ColumnString>(
+            remove_nullable(struct_col->get_column_ptr(file_path_idx)).get());
+    const auto* row_position_col = 
check_and_get_column<ColumnVector<TYPE_BIGINT>>(
+            
remove_nullable(struct_col->get_column_ptr(row_position_idx)).get());
+
+    if (!file_path_col || !row_position_col) {
+        return Status::InternalError(
+                "__DORIS_ICEBERG_ROWID_COL__ struct fields have incorrect 
types");
+    }
+
+    const ColumnVector<TYPE_INT>* spec_id_col = nullptr;
+    const ColumnString* partition_data_col = nullptr;
+    if (spec_id_idx >= 0 && spec_id_idx < static_cast<int>(field_count)) {
+        spec_id_col = check_and_get_column<ColumnVector<TYPE_INT>>(
+                
remove_nullable(struct_col->get_column_ptr(spec_id_idx)).get());
+        if (!spec_id_col) {
+            return Status::InternalError(
+                    "__DORIS_ICEBERG_ROWID_COL__ partition_spec_id has 
incorrect type");
+        }
+    }
+    if (partition_data_idx >= 0 && partition_data_idx < 
static_cast<int>(field_count)) {
+        partition_data_col = check_and_get_column<ColumnString>(
+                
remove_nullable(struct_col->get_column_ptr(partition_data_idx)).get());
+        if (!partition_data_col) {
+            return Status::InternalError(
+                    "__DORIS_ICEBERG_ROWID_COL__ partition_data has incorrect 
type");
+        }
+    }
+
+    // Group by file_path using roaring bitmap
+    for (size_t i = 0; i < block.rows(); ++i) {
+        std::string file_path = file_path_col->get_data_at(i).to_string();
+        int64_t row_position = row_position_col->get_element(i);
+        if (row_position < 0) {
+            return Status::InternalError("Invalid row_position {} in row_id 
column", row_position);
+        }
+
+        int32_t partition_spec_id = _partition_spec_id;
+        std::string partition_data_json = _partition_data_json;
+        if (spec_id_col != nullptr) {
+            partition_spec_id = spec_id_col->get_element(i);
+        }
+        if (partition_data_col != nullptr) {
+            partition_data_json = 
partition_data_col->get_data_at(i).to_string();
+        }
+
+        auto [iter, inserted] = file_deletions.emplace(
+                file_path, IcebergFileDeletion(partition_spec_id, 
partition_data_json));
+        if (!inserted) {
+            if (iter->second.partition_spec_id != partition_spec_id ||
+                iter->second.partition_data_json != partition_data_json) {
+                LOG(WARNING) << fmt::format(
+                        "Mismatched partition info for file {}, existing 
spec_id={}, data={}, "
+                        "new spec_id={}, data={}",
+                        file_path, iter->second.partition_spec_id, 
iter->second.partition_data_json,
+                        partition_spec_id, partition_data_json);
+            }
+        }
+        iter->second.rows_to_delete.add(static_cast<uint64_t>(row_position));
+    }
+
+    return Status::OK();
+}
+
+Status VIcebergDeleteSink::_write_position_delete_files(
+        const std::map<std::string, IcebergFileDeletion>& file_deletions) {
+    constexpr size_t kBatchSize = 4096;
+    for (const auto& [data_file_path, deletion] : file_deletions) {
+        if (deletion.rows_to_delete.isEmpty()) {
+            continue;
+        }
+        // Generate unique delete file path
+        std::string delete_file_path = 
_generate_delete_file_path(data_file_path);
+
+        // Create delete file writer
+        auto writer = VIcebergDeleteFileWriterFactory::create_writer(
+                TFileContent::POSITION_DELETES, delete_file_path, 
_file_format_type,
+                _compress_type);
+
+        // Build column names for position delete
+        std::vector<std::string> column_names = {"file_path", "pos"};
+
+        if (_position_delete_output_expr_ctxs.empty()) {
+            RETURN_IF_ERROR(_init_position_delete_output_exprs());
+        }
+
+        // Open writer
+        RETURN_IF_ERROR(writer->open(_state, _state->runtime_profile(),
+                                     _position_delete_output_expr_ctxs, 
column_names, _hadoop_conf,
+                                     _file_type, _broker_addresses));
+
+        // Build block with (file_path, pos) columns
+        std::vector<int64_t> positions;
+        positions.reserve(kBatchSize);
+        for (auto it = deletion.rows_to_delete.begin(); it != 
deletion.rows_to_delete.end(); ++it) {
+            positions.push_back(static_cast<int64_t>(*it));
+            if (positions.size() >= kBatchSize) {
+                Block delete_block;
+                RETURN_IF_ERROR(
+                        _build_position_delete_block(data_file_path, 
positions, delete_block));
+                RETURN_IF_ERROR(writer->write(delete_block));
+                positions.clear();
+            }
+        }
+        if (!positions.empty()) {
+            Block delete_block;
+            RETURN_IF_ERROR(_build_position_delete_block(data_file_path, 
positions, delete_block));
+            RETURN_IF_ERROR(writer->write(delete_block));
+        }
+
+        // Set partition info on writer before close
+        writer->set_partition_info(deletion.partition_spec_id, 
deletion.partition_data_json);
+
+        // Close writer and collect commit data
+        TIcebergCommitData commit_data;
+        RETURN_IF_ERROR(writer->close(commit_data));
+
+        // Set referenced data file path
+        commit_data.__set_referenced_data_file_path(data_file_path);
+
+        _commit_data_list.push_back(commit_data);
+        _delete_file_count++;
+
+        VLOG(1) << fmt::format("Written position delete file: path={}, 
rows={}, referenced_file={}",
+                               delete_file_path, commit_data.row_count, 
data_file_path);
+    }
+
+    return Status::OK();
+}
+
+Status VIcebergDeleteSink::_init_position_delete_output_exprs() {
+    if (!_position_delete_output_expr_ctxs.empty()) {
+        return Status::OK();
+    }
+
+    std::vector<TExpr> texprs;
+    texprs.reserve(2);
+
+    std::string empty_string;
+    TExprNode file_path_node =
+            create_texpr_node_from(&empty_string, PrimitiveType::TYPE_STRING, 
0, 0);
+    file_path_node.__set_num_children(0);
+    file_path_node.__set_output_scale(0);
+    file_path_node.__set_is_nullable(false);
+    TExpr file_path_expr;
+    file_path_expr.nodes.emplace_back(std::move(file_path_node));
+    texprs.emplace_back(std::move(file_path_expr));
+
+    int64_t zero = 0;
+    TExprNode pos_node = create_texpr_node_from(&zero, 
PrimitiveType::TYPE_BIGINT, 0, 0);
+    pos_node.__set_num_children(0);
+    pos_node.__set_output_scale(0);
+    pos_node.__set_is_nullable(false);
+    TExpr pos_expr;
+    pos_expr.nodes.emplace_back(std::move(pos_node));
+    texprs.emplace_back(std::move(pos_expr));
+
+    RETURN_IF_ERROR(VExpr::create_expr_trees(texprs, 
_position_delete_output_expr_ctxs));
+    return Status::OK();
+}
+
+Status VIcebergDeleteSink::_build_position_delete_block(const std::string& 
file_path,
+                                                        const 
std::vector<int64_t>& positions,
+                                                        Block& output_block) {
+    // Create file_path column (repeated for each position)
+    auto file_path_col = ColumnString::create();
+    for (size_t i = 0; i < positions.size(); ++i) {
+        file_path_col->insert_data(file_path.data(), file_path.size());
+    }
+
+    // Create pos column
+    auto pos_col = ColumnVector<TYPE_BIGINT>::create();
+    pos_col->get_data().assign(positions.begin(), positions.end());
+
+    // Build block
+    output_block.insert(ColumnWithTypeAndName(std::move(file_path_col),
+                                              
std::make_shared<DataTypeString>(), "file_path"));
+    output_block.insert(
+            ColumnWithTypeAndName(std::move(pos_col), 
std::make_shared<DataTypeInt64>(), "pos"));
+
+    return Status::OK();
+}
+
+std::string VIcebergDeleteSink::_get_file_extension() const {
+    std::string compress_name;
+    switch (_compress_type) {
+    case TFileCompressType::SNAPPYBLOCK: {
+        compress_name = ".snappy";
+        break;
+    }
+    case TFileCompressType::ZLIB: {
+        compress_name = ".zlib";
+        break;
+    }
+    case TFileCompressType::ZSTD: {
+        compress_name = ".zstd";
+        break;
+    }
+    default: {
+        compress_name = "";
+        break;
+    }
+    }
+
+    std::string file_format_name;
+    switch (_file_format_type) {
+    case TFileFormatType::FORMAT_PARQUET: {
+        file_format_name = ".parquet";
+        break;
+    }
+    case TFileFormatType::FORMAT_ORC: {
+        file_format_name = ".orc";
+        break;
+    }
+    default: {
+        file_format_name = "";
+        break;
+    }
+    }
+    return fmt::format("{}{}", compress_name, file_format_name);
+}
+
+std::string VIcebergDeleteSink::_generate_delete_file_path(
+        const std::string& referenced_data_file) {

Review Comment:
   **MEDIUM: UUID truncated to 8 characters — collision risk**
   
   The generated delete file name uses `uuid.substr(0, 8)` which is only 8 hex 
characters (32 bits of entropy). Combined with `std::hash % 10000000`, there's 
a non-trivial collision risk in high-throughput concurrent environments where 
many delete files are written simultaneously.
   
   Consider using the full UUID or at least a significantly longer substring 
(e.g., 16+ characters).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to