morrySnow commented on code in PR #18869: URL: https://github.com/apache/doris/pull/18869#discussion_r1203909978
########## fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundOlapTableSink.java: ########## @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.analyzer; + +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.properties.UnboundLogicalProperties; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.logical.LogicalUnary; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; + +import com.google.common.base.Preconditions; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * Represent an olap table sink plan node that has not been bound. + */ +public class UnboundOlapTableSink<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_TYPE> implements Unbound { + private final List<String> nameParts; + private final List<String> colNames; + private final List<String> hints; + private final List<String> partitions; + + public UnboundOlapTableSink(List<String> nameParts, List<String> colNames, List<String> hints, + List<String> partitions, CHILD_TYPE child) { + this(nameParts, colNames, hints, partitions, Optional.empty(), Optional.empty(), child); + } + + public UnboundOlapTableSink(List<String> nameParts, List<String> colNames, List<String> hints, + List<String> partitions, Optional<GroupExpression> groupExpression, + Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) { + super(PlanType.LOGICAL_UNBOUND_OLAP_TABLE_SINK, groupExpression, logicalProperties, child); + this.nameParts = nameParts; + this.colNames = colNames; + this.hints = hints; + this.partitions = partitions; Review Comment: use ImmutableList.copyOf(Objects.requireNotNull(...)) ########## fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java: ########## @@ -95,6 +93,7 @@ public class NereidsPlanner extends Planner { private Plan analyzedPlan; private Plan rewrittenPlan; private Plan optimizedPlan; + private PhysicalPlan physicalPlan; Review Comment: move back as local var ########## fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java: ########## @@ -166,7 +165,6 @@ public PhysicalPlan plan(LogicalPlan plan, PhysicalProperties outputProperties) * Do analyze and optimize for query plan. * * @param plan wait for plan - * @param requireProperties request physical properties constraints Review Comment: add back this java doc ########## fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java: ########## @@ -252,7 +250,7 @@ public Plan plan(LogicalPlan plan, PhysicalProperties requireProperties, Explain LOG.info(memo); } - int nth = cascadesContext.getConnectContext().getSessionVariable().getNthOptimizedPlan(); + int nth = ConnectContext.get().getSessionVariable().getNthOptimizedPlan(); Review Comment: change back to cascadesContext.getConnectContext() ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java: ########## @@ -0,0 +1,201 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.physical; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.statistics.Statistics; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * physical olap table sink for insert command + */ +public class PhysicalOlapTableSink<CHILD_TYPE extends Plan> extends PhysicalUnary<CHILD_TYPE> { + private final Database database; + private final OlapTable targetTable; + private final List<Column> cols; + private final List<Long> partitionIds; + private final boolean singleReplicaLoad; + + public PhysicalOlapTableSink(Database database, OlapTable targetTable, List<Long> partitionIds, List<Column> cols, + boolean singleReplicaLoad, LogicalProperties logicalProperties, CHILD_TYPE child) { + this(database, targetTable, partitionIds, cols, singleReplicaLoad, Optional.empty(), logicalProperties, child); + } + + /** + * Constructor + */ + public PhysicalOlapTableSink(Database database, OlapTable targetTable, List<Long> partitionIds, List<Column> cols, + boolean singleReplicaLoad, Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties, + CHILD_TYPE child) { + super(PlanType.PHYSICAL_OLAP_TABLE_SINK, groupExpression, logicalProperties, child); + this.database = Preconditions.checkNotNull(database, "database != null in PhysicalOlapTableSink"); + this.targetTable = Preconditions.checkNotNull(targetTable, "targetTable != null in PhysicalOlapTableSink"); + this.cols = cols; + this.partitionIds = partitionIds; + this.singleReplicaLoad = singleReplicaLoad; + } + + /** + * Constructor + */ + public PhysicalOlapTableSink(Database database, OlapTable targetTable, List<Long> partitionIds, List<Column> cols, + boolean singleReplicaLoad, Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties, + PhysicalProperties physicalProperties, Statistics statistics, CHILD_TYPE child) { + super(PlanType.PHYSICAL_OLAP_TABLE_SINK, groupExpression, logicalProperties, physicalProperties, + statistics, child); + this.database = Preconditions.checkNotNull(database, "database != null in PhysicalOlapTableSink"); + this.targetTable = Preconditions.checkNotNull(targetTable, "targetTable != null in PhysicalOlapTableSink"); + this.cols = cols; + this.partitionIds = partitionIds; + this.singleReplicaLoad = singleReplicaLoad; + } + + public Database getDatabase() { + return database; + } + + public OlapTable getTargetTable() { + return targetTable; + } + + public List<Column> getCols() { + return cols; + } + + public List<Long> getPartitionIds() { + return partitionIds; + } + + public boolean isSingleReplicaLoad() { + return singleReplicaLoad; + } + + @Override + public Plan withChildren(List<Plan> children) { + Preconditions.checkArgument(children.size() == 1, "PhysicalOlapTableSink only accepts one child"); + return new PhysicalOlapTableSink<>(database, targetTable, partitionIds, cols, singleReplicaLoad, + getLogicalProperties(), children.get(0)); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + PhysicalOlapTableSink<?> that = (PhysicalOlapTableSink<?>) o; + return Objects.equals(targetTable, that.targetTable) && Objects.equals(partitionIds, + that.partitionIds); + } + + @Override + public int hashCode() { + return Objects.hash(targetTable, partitionIds); + } + + @Override + public List<Slot> computeOutput() { + return child().getOutput(); + } + + @Override + public List<Slot> getOutput() { + return computeOutput(); + } + + @Override + public Set<Slot> getOutputSet() { Review Comment: override which base class's function? ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java: ########## @@ -0,0 +1,201 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.physical; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.statistics.Statistics; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * physical olap table sink for insert command + */ +public class PhysicalOlapTableSink<CHILD_TYPE extends Plan> extends PhysicalUnary<CHILD_TYPE> { + private final Database database; + private final OlapTable targetTable; + private final List<Column> cols; + private final List<Long> partitionIds; + private final boolean singleReplicaLoad; + + public PhysicalOlapTableSink(Database database, OlapTable targetTable, List<Long> partitionIds, List<Column> cols, + boolean singleReplicaLoad, LogicalProperties logicalProperties, CHILD_TYPE child) { + this(database, targetTable, partitionIds, cols, singleReplicaLoad, Optional.empty(), logicalProperties, child); + } + + /** + * Constructor + */ + public PhysicalOlapTableSink(Database database, OlapTable targetTable, List<Long> partitionIds, List<Column> cols, + boolean singleReplicaLoad, Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties, + CHILD_TYPE child) { + super(PlanType.PHYSICAL_OLAP_TABLE_SINK, groupExpression, logicalProperties, child); + this.database = Preconditions.checkNotNull(database, "database != null in PhysicalOlapTableSink"); + this.targetTable = Preconditions.checkNotNull(targetTable, "targetTable != null in PhysicalOlapTableSink"); + this.cols = cols; + this.partitionIds = partitionIds; + this.singleReplicaLoad = singleReplicaLoad; + } + + /** + * Constructor + */ + public PhysicalOlapTableSink(Database database, OlapTable targetTable, List<Long> partitionIds, List<Column> cols, + boolean singleReplicaLoad, Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties, + PhysicalProperties physicalProperties, Statistics statistics, CHILD_TYPE child) { + super(PlanType.PHYSICAL_OLAP_TABLE_SINK, groupExpression, logicalProperties, physicalProperties, + statistics, child); + this.database = Preconditions.checkNotNull(database, "database != null in PhysicalOlapTableSink"); + this.targetTable = Preconditions.checkNotNull(targetTable, "targetTable != null in PhysicalOlapTableSink"); + this.cols = cols; + this.partitionIds = partitionIds; + this.singleReplicaLoad = singleReplicaLoad; + } + + public Database getDatabase() { + return database; + } + + public OlapTable getTargetTable() { + return targetTable; + } + + public List<Column> getCols() { + return cols; + } + + public List<Long> getPartitionIds() { + return partitionIds; + } + + public boolean isSingleReplicaLoad() { + return singleReplicaLoad; + } + + @Override + public Plan withChildren(List<Plan> children) { + Preconditions.checkArgument(children.size() == 1, "PhysicalOlapTableSink only accepts one child"); + return new PhysicalOlapTableSink<>(database, targetTable, partitionIds, cols, singleReplicaLoad, + getLogicalProperties(), children.get(0)); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + PhysicalOlapTableSink<?> that = (PhysicalOlapTableSink<?>) o; + return Objects.equals(targetTable, that.targetTable) && Objects.equals(partitionIds, + that.partitionIds); + } + + @Override + public int hashCode() { + return Objects.hash(targetTable, partitionIds); Review Comment: why only table and partition in equals and hashCode? ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java: ########## @@ -0,0 +1,201 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.physical; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.statistics.Statistics; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * physical olap table sink for insert command + */ +public class PhysicalOlapTableSink<CHILD_TYPE extends Plan> extends PhysicalUnary<CHILD_TYPE> { + private final Database database; + private final OlapTable targetTable; + private final List<Column> cols; + private final List<Long> partitionIds; + private final boolean singleReplicaLoad; + + public PhysicalOlapTableSink(Database database, OlapTable targetTable, List<Long> partitionIds, List<Column> cols, + boolean singleReplicaLoad, LogicalProperties logicalProperties, CHILD_TYPE child) { + this(database, targetTable, partitionIds, cols, singleReplicaLoad, Optional.empty(), logicalProperties, child); + } + + /** + * Constructor + */ + public PhysicalOlapTableSink(Database database, OlapTable targetTable, List<Long> partitionIds, List<Column> cols, + boolean singleReplicaLoad, Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties, + CHILD_TYPE child) { + super(PlanType.PHYSICAL_OLAP_TABLE_SINK, groupExpression, logicalProperties, child); + this.database = Preconditions.checkNotNull(database, "database != null in PhysicalOlapTableSink"); + this.targetTable = Preconditions.checkNotNull(targetTable, "targetTable != null in PhysicalOlapTableSink"); + this.cols = cols; + this.partitionIds = partitionIds; + this.singleReplicaLoad = singleReplicaLoad; + } + + /** + * Constructor + */ + public PhysicalOlapTableSink(Database database, OlapTable targetTable, List<Long> partitionIds, List<Column> cols, + boolean singleReplicaLoad, Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties, + PhysicalProperties physicalProperties, Statistics statistics, CHILD_TYPE child) { + super(PlanType.PHYSICAL_OLAP_TABLE_SINK, groupExpression, logicalProperties, physicalProperties, + statistics, child); + this.database = Preconditions.checkNotNull(database, "database != null in PhysicalOlapTableSink"); + this.targetTable = Preconditions.checkNotNull(targetTable, "targetTable != null in PhysicalOlapTableSink"); + this.cols = cols; + this.partitionIds = partitionIds; Review Comment: ditto ########## fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java: ########## @@ -284,6 +291,82 @@ public PlanFragment translatePlan(PhysicalPlan physicalPlan) { return rootFragment; } + @Override + public PlanFragment visitPhysicalOlapTableSink(PhysicalOlapTableSink<? extends Plan> olapTableSink, + PlanTranslatorContext context) { + PlanFragment rootFragment = olapTableSink.child().accept(this, context); + + TupleDescriptor olapTuple = context.generateTupleDesc(); + for (Column column : olapTableSink.getTargetTable().getFullSchema()) { + SlotDescriptor slotDesc = context.addSlotDesc(olapTuple); + slotDesc.setIsMaterialized(true); + slotDesc.setType(column.getType()); + slotDesc.setColumn(column); + slotDesc.setIsNullable(column.isAllowNull()); + } + + OlapTableSink sink = new OlapTableSink( + olapTableSink.getTargetTable(), + olapTuple, + olapTableSink.getPartitionIds(), + olapTableSink.isSingleReplicaLoad() + ); + + Map<Column, Slot> columnToSlots = Maps.newHashMap(); + Preconditions.checkArgument(olapTableSink.getOutput().size() == olapTableSink.getCols().size(), + "this is a bug in insert into command"); + for (int i = 0; i < olapTableSink.getCols().size(); ++i) { + columnToSlots.put(olapTableSink.getCols().get(i), olapTableSink.getOutput().get(i)); + } + List<Expr> outputExprs = Lists.newArrayList(); + try { + for (Column column : olapTableSink.getTargetTable().getFullSchema()) { + if (columnToSlots.containsKey(column)) { + ExprId exprId = columnToSlots.get(column).getExprId(); + outputExprs.add(context.findSlotRef(exprId).checkTypeCompatibility(column.getType())); + } else if (column.getDefaultValue() == null) { + outputExprs.add(NullLiteral.create(column.getType())); + } else { + if (column.getDefaultValueExprDef() != null) { + outputExprs.add(column.getDefaultValueExpr()); + } else { + StringLiteral defaultValueExpr = new StringLiteral(column.getDefaultValue()); + outputExprs.add(defaultValueExpr.checkTypeCompatibility(column.getType())); + } + } + } + } catch (Exception e) { + throw new AnalysisException(e.getMessage(), e.getCause()); + } + + HashDistributionInfo distributionInfo = ((HashDistributionInfo) olapTableSink.getTargetTable() + .getDefaultDistributionInfo()); + List<Expr> partitionExprs = distributionInfo.getDistributionColumns().stream() + .map(column -> context.findSlotRef(columnToSlots.get(column).getExprId())) + .collect(Collectors.toList()); + + if (rootFragment.getPlanRoot() instanceof ExchangeNode) { + ExchangeNode exchangeNode = ((ExchangeNode) rootFragment.getPlanRoot()); + PlanFragment currentFragment = new PlanFragment( + context.nextFragmentId(), + exchangeNode, + rootFragment.getDataPartition()); + + rootFragment.setPlanRoot(exchangeNode.getChild(0)); + rootFragment.getPlanRoot().setNumInstances(1); + rootFragment.setDestination(exchangeNode); Review Comment: so, just set data partition and remove the useless exchange? ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java: ########## @@ -0,0 +1,201 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.physical; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.statistics.Statistics; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * physical olap table sink for insert command + */ +public class PhysicalOlapTableSink<CHILD_TYPE extends Plan> extends PhysicalUnary<CHILD_TYPE> { + private final Database database; + private final OlapTable targetTable; + private final List<Column> cols; + private final List<Long> partitionIds; + private final boolean singleReplicaLoad; + + public PhysicalOlapTableSink(Database database, OlapTable targetTable, List<Long> partitionIds, List<Column> cols, + boolean singleReplicaLoad, LogicalProperties logicalProperties, CHILD_TYPE child) { + this(database, targetTable, partitionIds, cols, singleReplicaLoad, Optional.empty(), logicalProperties, child); + } + + /** + * Constructor + */ + public PhysicalOlapTableSink(Database database, OlapTable targetTable, List<Long> partitionIds, List<Column> cols, + boolean singleReplicaLoad, Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties, + CHILD_TYPE child) { + super(PlanType.PHYSICAL_OLAP_TABLE_SINK, groupExpression, logicalProperties, child); + this.database = Preconditions.checkNotNull(database, "database != null in PhysicalOlapTableSink"); + this.targetTable = Preconditions.checkNotNull(targetTable, "targetTable != null in PhysicalOlapTableSink"); + this.cols = cols; + this.partitionIds = partitionIds; + this.singleReplicaLoad = singleReplicaLoad; + } + + /** + * Constructor + */ + public PhysicalOlapTableSink(Database database, OlapTable targetTable, List<Long> partitionIds, List<Column> cols, + boolean singleReplicaLoad, Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties, + PhysicalProperties physicalProperties, Statistics statistics, CHILD_TYPE child) { + super(PlanType.PHYSICAL_OLAP_TABLE_SINK, groupExpression, logicalProperties, physicalProperties, + statistics, child); + this.database = Preconditions.checkNotNull(database, "database != null in PhysicalOlapTableSink"); + this.targetTable = Preconditions.checkNotNull(targetTable, "targetTable != null in PhysicalOlapTableSink"); + this.cols = cols; + this.partitionIds = partitionIds; + this.singleReplicaLoad = singleReplicaLoad; + } + + public Database getDatabase() { + return database; + } + + public OlapTable getTargetTable() { + return targetTable; + } + + public List<Column> getCols() { + return cols; + } + + public List<Long> getPartitionIds() { + return partitionIds; + } + + public boolean isSingleReplicaLoad() { + return singleReplicaLoad; + } + + @Override + public Plan withChildren(List<Plan> children) { + Preconditions.checkArgument(children.size() == 1, "PhysicalOlapTableSink only accepts one child"); + return new PhysicalOlapTableSink<>(database, targetTable, partitionIds, cols, singleReplicaLoad, + getLogicalProperties(), children.get(0)); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + PhysicalOlapTableSink<?> that = (PhysicalOlapTableSink<?>) o; + return Objects.equals(targetTable, that.targetTable) && Objects.equals(partitionIds, + that.partitionIds); + } + + @Override + public int hashCode() { + return Objects.hash(targetTable, partitionIds); + } + + @Override + public List<Slot> computeOutput() { + return child().getOutput(); + } + + @Override + public List<Slot> getOutput() { + return computeOutput(); + } + + @Override + public Set<Slot> getOutputSet() { + return ImmutableSet.copyOf(getOutput()); + } + + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitPhysicalOlapTableSink(this, context); + } + + @Override + public List<? extends Expression> getExpressions() { + return ImmutableList.of(); + } + + @Override + public Plan withGroupExpression(Optional<GroupExpression> groupExpression) { + return new PhysicalOlapTableSink<>(database, targetTable, partitionIds, cols, singleReplicaLoad, + groupExpression, getLogicalProperties(), child()); + } + + @Override + public Plan withLogicalProperties(Optional<LogicalProperties> logicalProperties) { + return new PhysicalOlapTableSink<>(database, targetTable, partitionIds, cols, singleReplicaLoad, + groupExpression, logicalProperties.get(), child()); + } + + @Override + public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) { + return new PhysicalOlapTableSink<>(database, targetTable, partitionIds, cols, singleReplicaLoad, + groupExpression, getLogicalProperties(), physicalProperties, statistics, child()); + } + + /** + * get output physical properties + */ + public PhysicalProperties getRequirePhysicalProperties() { + HashDistributionInfo distributionInfo = ((HashDistributionInfo) targetTable.getDefaultDistributionInfo()); + List<Column> distributedColumns = distributionInfo.getDistributionColumns(); + List<Integer> columnIndexes = Lists.newArrayList(); + int idx = 0; + for (int i = 0; i < targetTable.getFullSchema().size(); ++i) { + if (targetTable.getFullSchema().get(i).equals(distributedColumns.get(idx))) { + columnIndexes.add(i); + idx++; + if (idx == distributedColumns.size()) { + break; + } + } + } + return PhysicalProperties.createHash(columnIndexes.stream() + .map(colIdx -> getOutput().get(colIdx).getExprId()).collect(Collectors.toList()), ShuffleType.NATURAL); Review Comment: why the require properties type is NATRUAL? NATRUAL is only use for output ########## fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java: ########## @@ -284,6 +291,82 @@ public PlanFragment translatePlan(PhysicalPlan physicalPlan) { return rootFragment; } + @Override + public PlanFragment visitPhysicalOlapTableSink(PhysicalOlapTableSink<? extends Plan> olapTableSink, + PlanTranslatorContext context) { + PlanFragment rootFragment = olapTableSink.child().accept(this, context); + + TupleDescriptor olapTuple = context.generateTupleDesc(); + for (Column column : olapTableSink.getTargetTable().getFullSchema()) { + SlotDescriptor slotDesc = context.addSlotDesc(olapTuple); + slotDesc.setIsMaterialized(true); + slotDesc.setType(column.getType()); + slotDesc.setColumn(column); + slotDesc.setIsNullable(column.isAllowNull()); + } + + OlapTableSink sink = new OlapTableSink( + olapTableSink.getTargetTable(), + olapTuple, + olapTableSink.getPartitionIds(), + olapTableSink.isSingleReplicaLoad() + ); + + Map<Column, Slot> columnToSlots = Maps.newHashMap(); + Preconditions.checkArgument(olapTableSink.getOutput().size() == olapTableSink.getCols().size(), + "this is a bug in insert into command"); + for (int i = 0; i < olapTableSink.getCols().size(); ++i) { + columnToSlots.put(olapTableSink.getCols().get(i), olapTableSink.getOutput().get(i)); + } + List<Expr> outputExprs = Lists.newArrayList(); + try { + for (Column column : olapTableSink.getTargetTable().getFullSchema()) { + if (columnToSlots.containsKey(column)) { + ExprId exprId = columnToSlots.get(column).getExprId(); + outputExprs.add(context.findSlotRef(exprId).checkTypeCompatibility(column.getType())); + } else if (column.getDefaultValue() == null) { + outputExprs.add(NullLiteral.create(column.getType())); + } else { + if (column.getDefaultValueExprDef() != null) { + outputExprs.add(column.getDefaultValueExpr()); + } else { + StringLiteral defaultValueExpr = new StringLiteral(column.getDefaultValue()); + outputExprs.add(defaultValueExpr.checkTypeCompatibility(column.getType())); + } + } + } + } catch (Exception e) { + throw new AnalysisException(e.getMessage(), e.getCause()); + } + + HashDistributionInfo distributionInfo = ((HashDistributionInfo) olapTableSink.getTargetTable() + .getDefaultDistributionInfo()); + List<Expr> partitionExprs = distributionInfo.getDistributionColumns().stream() + .map(column -> context.findSlotRef(columnToSlots.get(column).getExprId())) + .collect(Collectors.toList()); + + if (rootFragment.getPlanRoot() instanceof ExchangeNode) { + ExchangeNode exchangeNode = ((ExchangeNode) rootFragment.getPlanRoot()); + PlanFragment currentFragment = new PlanFragment( + context.nextFragmentId(), + exchangeNode, + rootFragment.getDataPartition()); + + rootFragment.setPlanRoot(exchangeNode.getChild(0)); + rootFragment.getPlanRoot().setNumInstances(1); + rootFragment.setDestination(exchangeNode); + context.addPlanFragment(currentFragment); + rootFragment = currentFragment; + } + rootFragment.getPlanRoot().setNumInstances(1); Review Comment: why need to set this? ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java: ########## @@ -0,0 +1,201 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.physical; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.statistics.Statistics; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * physical olap table sink for insert command + */ +public class PhysicalOlapTableSink<CHILD_TYPE extends Plan> extends PhysicalUnary<CHILD_TYPE> { + private final Database database; + private final OlapTable targetTable; + private final List<Column> cols; + private final List<Long> partitionIds; + private final boolean singleReplicaLoad; + + public PhysicalOlapTableSink(Database database, OlapTable targetTable, List<Long> partitionIds, List<Column> cols, + boolean singleReplicaLoad, LogicalProperties logicalProperties, CHILD_TYPE child) { + this(database, targetTable, partitionIds, cols, singleReplicaLoad, Optional.empty(), logicalProperties, child); + } + + /** + * Constructor + */ + public PhysicalOlapTableSink(Database database, OlapTable targetTable, List<Long> partitionIds, List<Column> cols, + boolean singleReplicaLoad, Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties, + CHILD_TYPE child) { + super(PlanType.PHYSICAL_OLAP_TABLE_SINK, groupExpression, logicalProperties, child); + this.database = Preconditions.checkNotNull(database, "database != null in PhysicalOlapTableSink"); + this.targetTable = Preconditions.checkNotNull(targetTable, "targetTable != null in PhysicalOlapTableSink"); + this.cols = cols; + this.partitionIds = partitionIds; Review Comment: ImmutableList.copyOf(Objects.requireNotNull(...)) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
