morningman commented on code in PR #60482:
URL: https://github.com/apache/doris/pull/60482#discussion_r2977868353


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/IcebergMergeCommand.java:
##########
@@ -0,0 +1,515 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.analysis.StmtType;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.iceberg.IcebergConflictDetectionFilterUtils;
+import org.apache.doris.datasource.iceberg.IcebergExternalDatabase;
+import org.apache.doris.datasource.iceberg.IcebergExternalTable;
+import org.apache.doris.datasource.iceberg.IcebergMergeOperation;
+import org.apache.doris.datasource.iceberg.IcebergNereidsUtils;
+import org.apache.doris.datasource.iceberg.IcebergRowId;
+import org.apache.doris.nereids.NereidsPlanner;
+import org.apache.doris.nereids.analyzer.UnboundAlias;
+import org.apache.doris.nereids.analyzer.UnboundRelation;
+import org.apache.doris.nereids.analyzer.UnboundSlot;
+import org.apache.doris.nereids.analyzer.UnboundStar;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.glue.LogicalPlanAdapter;
+import org.apache.doris.nereids.parser.LogicalPlanBuilderAssistant;
+import org.apache.doris.nereids.parser.NereidsParser;
+import org.apache.doris.nereids.rules.exploration.join.JoinReorderContext;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.IsNull;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Not;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
+import org.apache.doris.nereids.trees.expressions.functions.scalar.If;
+import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
+import org.apache.doris.nereids.trees.expressions.literal.NullLiteral;
+import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
+import org.apache.doris.nereids.trees.plans.Explainable;
+import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import 
org.apache.doris.nereids.trees.plans.commands.delete.DeleteCommandContext;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.IcebergMergeExecutor;
+import org.apache.doris.nereids.trees.plans.commands.merge.MergeMatchedClause;
+import 
org.apache.doris.nereids.trees.plans.commands.merge.MergeNotMatchedClause;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalIcebergMergeSink;
+import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.logical.LogicalSubQueryAlias;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergMergeSink;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.types.DataType;
+import org.apache.doris.nereids.types.IntegerType;
+import org.apache.doris.nereids.util.RelationUtil;
+import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.planner.DataSink;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryState;
+import org.apache.doris.qe.StmtExecutor;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * MERGE INTO command for Iceberg tables.
+ */
+public class IcebergMergeCommand extends Command implements ForwardWithSync, 
Explainable {
+    private static final String BRANCH_LABEL = 
"__DORIS_ICEBERG_MERGE_INTO_BRANCH_LABEL__";
+
+    private final List<String> targetNameParts;
+    private final Optional<String> targetAlias;
+    private final List<String> targetNameInPlan;
+    private final Optional<LogicalPlan> cte;
+    private final LogicalPlan source;
+    private final Expression onClause;
+    private final List<MergeMatchedClause> matchedClauses;
+    private final List<MergeNotMatchedClause> notMatchedClauses;
+    private final DeleteCommandContext deleteCtx;
+
+    /**
+     * constructor.
+     */
+    public IcebergMergeCommand(List<String> targetNameParts, Optional<String> 
targetAlias,
+            Optional<LogicalPlan> cte, LogicalPlan source, Expression onClause,
+            List<MergeMatchedClause> matchedClauses, 
List<MergeNotMatchedClause> notMatchedClauses) {
+        super(PlanType.MERGE_INTO_COMMAND);
+        this.targetNameParts = Utils.copyRequiredList(targetNameParts);
+        this.targetAlias = Objects.requireNonNull(targetAlias, "targetAlias 
should not be null");
+        if (targetAlias.isPresent()) {
+            this.targetNameInPlan = ImmutableList.of(targetAlias.get());
+        } else {
+            this.targetNameInPlan = ImmutableList.copyOf(targetNameParts);
+        }
+        this.cte = Objects.requireNonNull(cte, "cte should not be null");
+        this.source = Objects.requireNonNull(source, "source should not be 
null");
+        this.onClause = Objects.requireNonNull(onClause, "onClause should not 
be null");
+        this.matchedClauses = Utils.fastToImmutableList(
+                Objects.requireNonNull(matchedClauses, "matchedClauses should 
not be null"));
+        this.notMatchedClauses = Utils.fastToImmutableList(
+                Objects.requireNonNull(notMatchedClauses, "notMatchedClauses 
should not be null"));
+        this.deleteCtx = new DeleteCommandContext();
+    }
+
+    @Override
+    public void run(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
+        TableIf table = getTargetTable(ctx);
+        if (!(table instanceof IcebergExternalTable)) {
+            throw new AnalysisException("MERGE INTO can only be used on 
Iceberg tables. "
+                    + "Table " + Util.getTempTableDisplayName(table.getName()) 
+ " is not an Iceberg table.");
+        }
+        IcebergExternalTable icebergTable = (IcebergExternalTable) table;
+        long previousTargetTableId = ctx.getIcebergRowIdTargetTableId();
+        ctx.setIcebergRowIdTargetTableId(icebergTable.getId());
+        try {
+            LogicalPlan mergePlan = buildMergePlan(ctx, icebergTable);
+            executeMergePlan(ctx, executor, icebergTable, mergePlan);
+        } finally {
+            ctx.setIcebergRowIdTargetTableId(previousTargetTableId);
+        }
+    }
+
+    @Override
+    public Plan getExplainPlan(ConnectContext ctx) {
+        TableIf table = getTargetTable(ctx);
+        if (!(table instanceof IcebergExternalTable)) {
+            throw new AnalysisException("MERGE INTO can only be used on 
Iceberg tables. "
+                    + "Table " + Util.getTempTableDisplayName(table.getName()) 
+ " is not an Iceberg table.");
+        }
+        long previousTargetTableId = ctx.getIcebergRowIdTargetTableId();
+        ctx.setIcebergRowIdTargetTableId(((IcebergExternalTable) 
table).getId());
+        try {
+            return buildMergePlan(ctx, (IcebergExternalTable) table);
+        } finally {
+            ctx.setIcebergRowIdTargetTableId(previousTargetTableId);
+        }
+    }
+
+    @Override
+    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+        return visitor.visitCommand(this, context);
+    }
+
+    @Override
+    public StmtType stmtType() {
+        return StmtType.MERGE_INTO;
+    }
+
+    private TableIf getTargetTable(ConnectContext ctx) {
+        List<String> qualifiedTableName = RelationUtil.getQualifierName(ctx, 
targetNameParts);
+        return RelationUtil.getTable(qualifiedTableName, ctx.getEnv(), 
Optional.empty());
+    }
+
+    private LogicalPlan generateBasePlan() {
+        LogicalPlan targetPlan = LogicalPlanBuilderAssistant.withCheckPolicy(
+                new UnboundRelation(
+                        StatementScopeIdGenerator.newRelationId(),
+                        targetNameParts
+                )
+        );
+        if (targetAlias.isPresent()) {
+            targetPlan = new LogicalSubQueryAlias<>(targetAlias.get(), 
targetPlan);
+        }
+        return new LogicalJoin<>(JoinType.LEFT_OUTER_JOIN,

Review Comment:
   Consider using `INNER JOIN` when there is no `WHEN NOT MATCHED` clause, for 
better performance.



##########
fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java:
##########
@@ -1023,6 +1028,37 @@ public void setStatementContext(StatementContext 
statementContext) {
         this.statementContext = statementContext;
     }
 
+    /** Backward-compatible: returns true if any Iceberg table is targeted for 
row_id injection. */
+    public boolean needIcebergRowId() {
+        return icebergRowIdTargetTableId >= 0;
+    }
+
+    /** Check if a specific table should include the hidden row_id column. */
+    public boolean needIcebergRowIdForTable(long tableId) {
+        return icebergRowIdTargetTableId >= 0 && icebergRowIdTargetTableId == 
tableId;
+    }
+
+    /** Set the target table ID for row_id injection. Use -1 to clear. */
+    public void setIcebergRowIdTargetTableId(long tableId) {
+        this.icebergRowIdTargetTableId = tableId;
+    }
+
+    /** Get the previously saved target table ID (for save/restore pattern). */
+    public long getIcebergRowIdTargetTableId() {
+        return icebergRowIdTargetTableId;
+    }
+
+    /** @deprecated Use setIcebergRowIdTargetTableId instead. Kept for 
backward compat. */

Review Comment:
   This is a newly added method and marked as `@deprecated`?



##########
be/src/exec/sink/viceberg_delete_sink.cpp:
##########
@@ -0,0 +1,501 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/sink/viceberg_delete_sink.h"
+
+#include <fmt/format.h>
+
+#include "common/logging.h"
+#include "core/block/column_with_type_and_name.h"
+#include "core/column/column_nullable.h"
+#include "core/column/column_string.h"
+#include "core/column/column_struct.h"
+#include "core/column/column_vector.h"
+#include "core/data_type/data_type_factory.hpp"
+#include "core/data_type/data_type_nullable.h"
+#include "core/data_type/data_type_number.h"
+#include "core/data_type/data_type_string.h"
+#include "core/data_type/data_type_struct.h"
+#include "exprs/vexpr.h"
+#include "format/transformer/vfile_format_transformer.h"
+#include "runtime/runtime_state.h"
+#include "util/string_util.h"
+#include "util/uid_util.h"
+
+namespace doris {
+
+VIcebergDeleteSink::VIcebergDeleteSink(const TDataSink& t_sink,
+                                       const VExprContextSPtrs& output_exprs,
+                                       std::shared_ptr<Dependency> dep,
+                                       std::shared_ptr<Dependency> fin_dep)
+        : AsyncResultWriter(output_exprs, dep, fin_dep), _t_sink(t_sink) {
+    DCHECK(_t_sink.__isset.iceberg_delete_sink);
+}
+
+Status VIcebergDeleteSink::init_properties(ObjectPool* pool) {
+    const auto& delete_sink = _t_sink.iceberg_delete_sink;
+
+    _delete_type = delete_sink.delete_type;
+    if (_delete_type != TFileContent::POSITION_DELETES) {
+        return Status::NotSupported("Iceberg delete only supports position 
delete files");
+    }
+
+    // Get file format settings
+    if (delete_sink.__isset.file_format) {
+        _file_format_type = delete_sink.file_format;
+    }
+
+    if (delete_sink.__isset.compress_type) {
+        _compress_type = delete_sink.compress_type;
+    }
+
+    // Get output path and table location
+    if (delete_sink.__isset.output_path) {
+        _output_path = delete_sink.output_path;
+    }
+
+    if (delete_sink.__isset.table_location) {
+        _table_location = delete_sink.table_location;
+    }
+
+    // Get Hadoop configuration
+    if (delete_sink.__isset.hadoop_config) {
+        _hadoop_conf.insert(delete_sink.hadoop_config.begin(), 
delete_sink.hadoop_config.end());
+    }
+
+    if (delete_sink.__isset.file_type) {
+        _file_type = delete_sink.file_type;
+    }
+
+    if (delete_sink.__isset.broker_addresses) {
+        _broker_addresses.assign(delete_sink.broker_addresses.begin(),
+                                 delete_sink.broker_addresses.end());
+    }
+
+    // Get partition information
+    if (delete_sink.__isset.partition_spec_id) {
+        _partition_spec_id = delete_sink.partition_spec_id;
+    }
+
+    if (delete_sink.__isset.partition_data_json) {
+        _partition_data_json = delete_sink.partition_data_json;
+    }
+
+    return Status::OK();
+}
+
+Status VIcebergDeleteSink::open(RuntimeState* state, RuntimeProfile* profile) {
+    _state = state;
+
+    // Initialize counters
+    _written_rows_counter = ADD_COUNTER(profile, "RowsWritten", TUnit::UNIT);
+    _send_data_timer = ADD_TIMER(profile, "SendDataTime");
+    _write_delete_files_timer = ADD_TIMER(profile, "WriteDeleteFilesTime");
+    _delete_file_count_counter = ADD_COUNTER(profile, "DeleteFileCount", 
TUnit::UNIT);
+    _open_timer = ADD_TIMER(profile, "OpenTime");
+    _close_timer = ADD_TIMER(profile, "CloseTime");
+
+    SCOPED_TIMER(_open_timer);
+
+    RETURN_IF_ERROR(_init_position_delete_output_exprs());
+
+    LOG(INFO) << fmt::format("VIcebergDeleteSink opened: delete_type={}, 
output_path={}",
+                             to_string(_delete_type), _output_path);
+
+    return Status::OK();
+}
+
+Status VIcebergDeleteSink::write(RuntimeState* state, Block& block) {
+    SCOPED_TIMER(_send_data_timer);
+
+    if (block.rows() == 0) {
+        return Status::OK();
+    }
+
+    _row_count += block.rows();
+
+    if (_delete_type != TFileContent::POSITION_DELETES) {
+        return Status::NotSupported("Iceberg delete only supports position 
delete files");
+    }
+
+    // Extract $row_id column and group by file_path
+    RETURN_IF_ERROR(_collect_position_deletes(block, _file_deletions));
+
+    if (_written_rows_counter) {
+        COUNTER_UPDATE(_written_rows_counter, block.rows());
+    }
+
+    return Status::OK();
+}
+
+Status VIcebergDeleteSink::close(Status close_status) {
+    SCOPED_TIMER(_close_timer);
+
+    if (!close_status.ok()) {
+        LOG(WARNING) << fmt::format("VIcebergDeleteSink close with error: {}",
+                                    close_status.to_string());
+        return close_status;
+    }
+
+    if (_delete_type == TFileContent::POSITION_DELETES && 
!_file_deletions.empty()) {
+        SCOPED_TIMER(_write_delete_files_timer);
+        RETURN_IF_ERROR(_write_position_delete_files(_file_deletions));

Review Comment:
   All file deletions will be hold in memory and write to disk until close().
   There will be potential memory issue if there are lots for delete files.



##########
fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java:
##########
@@ -1480,6 +1481,18 @@ public enum IgnoreSplitType {
     )
     public boolean showHiddenColumns = false;
 
+    // 内部变量,保留兼容性;实际由 ConnectContext.needIcebergRowId 控制

Review Comment:
   remove



##########
fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java:
##########
@@ -3577,6 +3590,11 @@ public boolean isEnableESParallelScroll() {
                             + "to exclude the impact of dangling delete 
files."})
     public boolean ignoreIcebergDanglingDelete = false;
 
+    @VariableMgr.VarAttr(name = ENABLE_ICEBERG_MERGE_PARTITIONING,

Review Comment:
   Remove chinese description



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to