This is an automated email from the ASF dual-hosted git repository. 924060929 pushed a commit to branch fe_local_shuffle in repository https://gitbox.apache.org/repos/asf/doris.git
commit e3792cb11f7f156770007ad819665e6ba3ff049d Author: 924060929 <[email protected]> AuthorDate: Tue Jun 30 10:59:19 2026 +0800 [opt](local shuffle) support bucket shuffle for set operation Re-enable bucket shuffle for set operation / union: the largest natural or storage-bucketed child keeps its bucket distribution and every other child is bucket-shuffled to it, avoiding a full reshuffle of the largest input (the same idea as bucket-shuffle join applied to set operations). This is only valid under the FE local-shuffle planner (enable_local_shuffle_planner): only then can the frontend plan the correct local shuffle type for the set sink/probe. With the BE-side local-shuffle planner the backend cannot infer the type and computes wrong results, so the plan falls back to execution-bucketed shuffle there and behavior is unchanged. --- .../glue/translator/PhysicalPlanTranslator.java | 18 +-- .../properties/ChildOutputPropertyDeriver.java | 100 +++++++------ .../properties/ChildrenPropertiesRegulator.java | 158 +++++++++++---------- .../nereids/properties/RequestPropertyDeriver.java | 14 +- .../bucket_shuffle_set_operation.groovy | 3 - 5 files changed, 158 insertions(+), 135 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 14e063ac7f2..86c9f6c4157 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -2496,14 +2496,16 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla setOperationNode.setColocate(true); } - // TODO: open comment when support `enable_local_shuffle_planner` - // for (Plan child : setOperation.children()) { - // PhysicalPlan childPhysicalPlan = (PhysicalPlan) child; - // if (JoinUtils.isStorageBucketed(childPhysicalPlan.getPhysicalProperties())) { - // setOperationNode.setDistributionMode(DistributionMode.BUCKET_SHUFFLE); - // break; - // } - // } + // A storage-bucketed child means set-op bucket shuffle was chosen (only under the FE + // local-shuffle planner); mark the node BUCKET_SHUFFLE so the set sink/probe align by + // bucket instead of execution-bucketed hash. + for (Plan child : setOperation.children()) { + PhysicalPlan childPhysicalPlan = (PhysicalPlan) child; + if (JoinUtils.isStorageBucketed(childPhysicalPlan.getPhysicalProperties())) { + setOperationNode.setDistributionMode(DistributionMode.BUCKET_SHUFFLE); + break; + } + } return setOperationFragment; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java index 8a71581ca97..d5858af29d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java @@ -30,6 +30,7 @@ import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFunction; import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.algebra.Union; import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort; import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows; import org.apache.doris.nereids.trees.plans.physical.PhysicalBucketedHashAggregate; @@ -71,9 +72,11 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -440,53 +443,56 @@ public class ChildOutputPropertyDeriver extends PlanVisitor<PhysicalProperties, return PhysicalProperties.GATHER; } - // TODO: open comment when support `enable_local_shuffle_planner` - // int distributeToChildIndex - // = setOperation.<Integer>getMutableState(PhysicalSetOperation.DISTRIBUTE_TO_CHILD_INDEX).orElse(-1); - // if (distributeToChildIndex >= 0 - // && childrenDistribution.get(distributeToChildIndex) instanceof DistributionSpecHash) { - // DistributionSpecHash childDistribution - // = (DistributionSpecHash) childrenDistribution.get(distributeToChildIndex); - // List<SlotReference> childToIndex = setOperation.getRegularChildrenOutputs().get(distributeToChildIndex); - // Map<ExprId, Integer> idToOutputIndex = new LinkedHashMap<>(); - // for (int j = 0; j < childToIndex.size(); j++) { - // idToOutputIndex.put(childToIndex.get(j).getExprId(), j); - // } - // - // List<ExprId> orderedShuffledColumns = childDistribution.getOrderedShuffledColumns(); - // List<ExprId> setOperationDistributeColumnIds = new ArrayList<>(); - // for (ExprId tableDistributeColumnId : orderedShuffledColumns) { - // Integer index = idToOutputIndex.get(tableDistributeColumnId); - // if (index == null) { - // break; - // } - // setOperationDistributeColumnIds.add(setOperation.getOutput().get(index).getExprId()); - // } - // // check whether the set operation output all distribution columns of the child - // if (setOperationDistributeColumnIds.size() == orderedShuffledColumns.size()) { - // boolean isUnion = setOperation instanceof Union; - // boolean shuffleToRight = distributeToChildIndex > 0; - // if (!isUnion && shuffleToRight) { - // return new PhysicalProperties( - // new DistributionSpecHash( - // setOperationDistributeColumnIds, - // ShuffleType.EXECUTION_BUCKETED - // ) - // ); - // } else { - // // keep the distribution as the child - // return new PhysicalProperties( - // new DistributionSpecHash( - // setOperationDistributeColumnIds, - // childDistribution.getShuffleType(), - // childDistribution.getTableId(), - // childDistribution.getSelectedIndexId(), - // childDistribution.getPartitionIds() - // ) - // ); - // } - // } - // } + // When set-op bucket shuffle is chosen (DISTRIBUTE_TO_CHILD_INDEX is set by + // ChildrenPropertiesRegulator, which only happens under the FE local-shuffle planner), + // the set operation keeps the basic child's bucket distribution as its own output so the + // bucket distribution propagates upward instead of being flattened to execution-bucketed. + int distributeToChildIndex + = setOperation.<Integer>getMutableState(PhysicalSetOperation.DISTRIBUTE_TO_CHILD_INDEX).orElse(-1); + if (distributeToChildIndex >= 0 + && childrenDistribution.get(distributeToChildIndex) instanceof DistributionSpecHash) { + DistributionSpecHash childDistribution + = (DistributionSpecHash) childrenDistribution.get(distributeToChildIndex); + List<SlotReference> childToIndex = setOperation.getRegularChildrenOutputs().get(distributeToChildIndex); + Map<ExprId, Integer> idToOutputIndex = new LinkedHashMap<>(); + for (int j = 0; j < childToIndex.size(); j++) { + idToOutputIndex.put(childToIndex.get(j).getExprId(), j); + } + + List<ExprId> orderedShuffledColumns = childDistribution.getOrderedShuffledColumns(); + List<ExprId> setOperationDistributeColumnIds = new ArrayList<>(); + for (ExprId tableDistributeColumnId : orderedShuffledColumns) { + Integer index = idToOutputIndex.get(tableDistributeColumnId); + if (index == null) { + break; + } + setOperationDistributeColumnIds.add(setOperation.getOutput().get(index).getExprId()); + } + // check whether the set operation output all distribution columns of the child + if (setOperationDistributeColumnIds.size() == orderedShuffledColumns.size()) { + boolean isUnion = setOperation instanceof Union; + boolean shuffleToRight = distributeToChildIndex > 0; + if (!isUnion && shuffleToRight) { + return new PhysicalProperties( + new DistributionSpecHash( + setOperationDistributeColumnIds, + ShuffleType.EXECUTION_BUCKETED + ) + ); + } else { + // keep the distribution as the child + return new PhysicalProperties( + new DistributionSpecHash( + setOperationDistributeColumnIds, + childDistribution.getShuffleType(), + childDistribution.getTableId(), + childDistribution.getSelectedIndexId(), + childDistribution.getPartitionIds() + ) + ); + } + } + } for (int i = 0; i < childrenDistribution.size(); i++) { DistributionSpec childDistribution = childrenDistribution.get(i); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java index e5e0d9d1bd0..ca1691463a2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java @@ -57,6 +57,7 @@ import org.apache.doris.statistics.util.StatisticsUtil; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -651,83 +652,94 @@ public class ChildrenPropertiesRegulator extends PlanVisitor<List<List<PhysicalP } else if (requiredDistributionSpec instanceof DistributionSpecHash) { // TODO: should use the most common hash spec as basic DistributionSpecHash basic = (DistributionSpecHash) requiredDistributionSpec; - // TODO: open comment when support `enable_local_shuffle_planner` - // int bucketShuffleBasicIndex = -1; - // double basicRowCount = -1; - - // find the bucket shuffle basic index - // try { - // ImmutableSet<ShuffleType> supportBucketShuffleTypes = ImmutableSet.of( - // ShuffleType.NATURAL, - // ShuffleType.STORAGE_BUCKETED - // ); - // for (int i = 0; i < originChildrenProperties.size(); i++) { - // PhysicalProperties originChildrenProperty = originChildrenProperties.get(i); - // DistributionSpec childDistribution = originChildrenProperty.getDistributionSpec(); - // if (childDistribution instanceof DistributionSpecHash - // && supportBucketShuffleTypes.contains( - // ((DistributionSpecHash) childDistribution).getShuffleType()) - // && !(isBucketShuffleDownGrade(setOperation.child(i)))) { - // Statistics stats = setOperation.child(i).getStats(); - // double rowCount = stats.getRowCount(); - // if (rowCount > basicRowCount) { - // basicRowCount = rowCount; - // bucketShuffleBasicIndex = i; - // } - // } - // } - // } catch (Throwable t) { - // // catch stats exception - // LOG.warn("Can not find the most (bucket num, rowCount): " + t, t); - // bucketShuffleBasicIndex = -1; - // } - - // use bucket shuffle - // if (bucketShuffleBasicIndex >= 0) { - // DistributionSpecHash notShuffleSideRequire - // = (DistributionSpecHash) requiredProperties.get(bucketShuffleBasicIndex) - // .getDistributionSpec(); - // - // DistributionSpecHash notNeedShuffleOutput - // = (DistributionSpecHash) originChildrenProperties.get(bucketShuffleBasicIndex) - // .getDistributionSpec(); - // - // for (int i = 0; i < originChildrenProperties.size(); i++) { - // DistributionSpecHash current - // = (DistributionSpecHash) originChildrenProperties.get(i).getDistributionSpec(); - // if (i == bucketShuffleBasicIndex) { - // continue; - // } - // - // DistributionSpecHash currentRequire - // = (DistributionSpecHash) requiredProperties.get(i).getDistributionSpec(); - // - // PhysicalProperties target = calAnotherSideRequired( - // ShuffleType.STORAGE_BUCKETED, - // notNeedShuffleOutput, current, - // notShuffleSideRequire, - // currentRequire); - // updateChildEnforceAndCost(i, target); - // } - // setOperation.setMutableState( - // PhysicalSetOperation.DISTRIBUTE_TO_CHILD_INDEX, bucketShuffleBasicIndex); - // use partitioned shuffle - // } else { - for (int i = 0; i < originChildrenProperties.size(); i++) { - DistributionSpecHash current - = (DistributionSpecHash) originChildrenProperties.get(i).getDistributionSpec(); - if (current.getShuffleType() != ShuffleType.EXECUTION_BUCKETED - || !bothSideShuffleKeysAreSameOrder(basic, current, - (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(), - (DistributionSpecHash) requiredProperties.get(i).getDistributionSpec())) { + int bucketShuffleBasicIndex = -1; + double basicRowCount = -1; + + // Bucket shuffle for set operation is only valid when the FE plans the local + // shuffle: with the BE-side local-shuffle planner the backend cannot infer the + // correct local shuffle type for the set sink/probe and computes wrong results. + // When the planner is off, keep bucketShuffleBasicIndex = -1 and fall back to the + // execution-bucketed (partitioned) shuffle below. + ConnectContext setOperationContext = ConnectContext.get(); + boolean enableLocalShufflePlanner = setOperationContext != null + && setOperationContext.getSessionVariable().isEnableLocalShufflePlanner(); + + // find the bucket shuffle basic index: the largest natural / storage-bucketed child + // keeps its bucket distribution, every other child is bucket-shuffled to it. + if (enableLocalShufflePlanner) { + try { + ImmutableSet<ShuffleType> supportBucketShuffleTypes = ImmutableSet.of( + ShuffleType.NATURAL, + ShuffleType.STORAGE_BUCKETED + ); + for (int i = 0; i < originChildrenProperties.size(); i++) { + PhysicalProperties originChildrenProperty = originChildrenProperties.get(i); + DistributionSpec childDistribution = originChildrenProperty.getDistributionSpec(); + if (childDistribution instanceof DistributionSpecHash + && supportBucketShuffleTypes.contains( + ((DistributionSpecHash) childDistribution).getShuffleType()) + && !(isBucketShuffleDownGrade(setOperation.child(i)))) { + Statistics stats = setOperation.child(i).getStats(); + double rowCount = stats.getRowCount(); + if (rowCount > basicRowCount) { + basicRowCount = rowCount; + bucketShuffleBasicIndex = i; + } + } + } + } catch (Throwable t) { + // catch stats exception + LOG.warn("Can not find the most (bucket num, rowCount): " + t, t); + bucketShuffleBasicIndex = -1; + } + } + + if (bucketShuffleBasicIndex >= 0) { + // use bucket shuffle + DistributionSpecHash notShuffleSideRequire + = (DistributionSpecHash) requiredProperties.get(bucketShuffleBasicIndex) + .getDistributionSpec(); + + DistributionSpecHash notNeedShuffleOutput + = (DistributionSpecHash) originChildrenProperties.get(bucketShuffleBasicIndex) + .getDistributionSpec(); + + for (int i = 0; i < originChildrenProperties.size(); i++) { + DistributionSpecHash current + = (DistributionSpecHash) originChildrenProperties.get(i).getDistributionSpec(); + if (i == bucketShuffleBasicIndex) { + continue; + } + + DistributionSpecHash currentRequire + = (DistributionSpecHash) requiredProperties.get(i).getDistributionSpec(); + PhysicalProperties target = calAnotherSideRequired( - ShuffleType.EXECUTION_BUCKETED, basic, current, - (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(), - (DistributionSpecHash) requiredProperties.get(i).getDistributionSpec()); + ShuffleType.STORAGE_BUCKETED, + notNeedShuffleOutput, current, + notShuffleSideRequire, + currentRequire); updateChildEnforceAndCost(i, target); } + setOperation.setMutableState( + PhysicalSetOperation.DISTRIBUTE_TO_CHILD_INDEX, bucketShuffleBasicIndex); + } else { + // use partitioned shuffle + for (int i = 0; i < originChildrenProperties.size(); i++) { + DistributionSpecHash current + = (DistributionSpecHash) originChildrenProperties.get(i).getDistributionSpec(); + if (current.getShuffleType() != ShuffleType.EXECUTION_BUCKETED + || !bothSideShuffleKeysAreSameOrder(basic, current, + (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(), + (DistributionSpecHash) requiredProperties.get(i).getDistributionSpec())) { + PhysicalProperties target = calAnotherSideRequired( + ShuffleType.EXECUTION_BUCKETED, basic, current, + (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(), + (DistributionSpecHash) requiredProperties.get(i).getDistributionSpec()); + updateChildEnforceAndCost(i, target); + } + } } - // } } return ImmutableList.of(originChildrenProperties); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java index 70f2b51665b..e0f4778b754 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java @@ -339,14 +339,20 @@ public class RequestPropertyDeriver extends PlanVisitor<Void, PlanContext> { // shuffle all column // TODO: for wide table, may be we should add a upper limit of shuffle columns - // TODO: open comment when support `enable_local_shuffle_planner` and change to REQUIRE - // intersect/except always need hash distribution, we use REQUIRE to auto select - // bucket shuffle or execution shuffle + // intersect/except always need hash distribution. Auto-selecting bucket shuffle + // (ShuffleType.REQUIRE) for set operation is only valid when the FE plans the local + // shuffle: with the BE-side local-shuffle planner the backend cannot infer the + // correct local shuffle type for the set sink/probe and computes wrong results, so + // fall back to EXECUTION_BUCKETED there. + ConnectContext setOperationContext = ConnectContext.get(); + ShuffleType setOperationShuffleType = setOperationContext != null + && setOperationContext.getSessionVariable().isEnableLocalShufflePlanner() + ? ShuffleType.REQUIRE : ShuffleType.EXECUTION_BUCKETED; addRequestPropertyToChildren(setOperation.getRegularChildrenOutputs().stream() .map(childOutputs -> childOutputs.stream() .map(SlotReference::getExprId) .collect(ImmutableList.toImmutableList())) - .map(l -> PhysicalProperties.createHash(l, ShuffleType.EXECUTION_BUCKETED)) + .map(l -> PhysicalProperties.createHash(l, setOperationShuffleType)) .collect(Collectors.toList())); } return null; diff --git a/regression-test/suites/query_p0/set_operations/bucket_shuffle_set_operation.groovy b/regression-test/suites/query_p0/set_operations/bucket_shuffle_set_operation.groovy index 5533853eaa9..81ac60baca7 100644 --- a/regression-test/suites/query_p0/set_operations/bucket_shuffle_set_operation.groovy +++ b/regression-test/suites/query_p0/set_operations/bucket_shuffle_set_operation.groovy @@ -16,9 +16,6 @@ // under the License. suite("bucket_shuffle_set_operation") { - // TODO: open comment when support `enable_local_shuffle_planner` and change to REQUIRE - return - multi_sql """ drop table if exists bucket_shuffle_set_operation1; create table bucket_shuffle_set_operation1(id int, value int) distributed by hash(id) buckets 10 properties('replication_num'='1'); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
