morrySnow commented on code in PR #23870:
URL: https://github.com/apache/doris/pull/23870#discussion_r1322286143
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java:
##########
@@ -338,8 +338,15 @@ public PhysicalProperties
visitPhysicalRepeat(PhysicalRepeat<? extends Plan> rep
public PhysicalProperties
visitPhysicalPartitionTopN(PhysicalPartitionTopN<? extends Plan> partitionTopN,
PlanContext context) {
Preconditions.checkState(childrenOutputProperties.size() == 1);
- PhysicalProperties childOutputProperty =
childrenOutputProperties.get(0);
- return new
PhysicalProperties(childOutputProperty.getDistributionSpec());
+ if (partitionTopN.getPhase().isLocal()) {
+ return new PhysicalProperties(
+ childrenOutputProperties.get(0).getDistributionSpec());
+ } else {
+ Preconditions.checkState(partitionTopN.getPhase().isGlobal());
+ DistributionSpec spec =
PhysicalProperties.createHash(partitionTopN.getPartitionKeys(),
+ ShuffleType.REQUIRE).getDistributionSpec();
Review Comment:
return shuffle type should same with child
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java:
##########
@@ -242,6 +243,19 @@ public Void
visitAbstractPhysicalSort(AbstractPhysicalSort<? extends Plan> sort,
return null;
}
+ @Override
+ public Void visitPhysicalPartitionTopN(PhysicalPartitionTopN<? extends
Plan> partitionTopN, PlanContext context) {
+ if (partitionTopN.getPhase().isGlobal()) {
+ PhysicalProperties properties =
PhysicalProperties.createHash(partitionTopN.getPartitionKeys(),
+ ShuffleType.REQUIRE);
+ //addRequestPropertyToChildren(PhysicalProperties.GATHER);
Review Comment:
remove this line
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java:
##########
@@ -21,33 +21,68 @@
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.expressions.OrderExpression;
+import org.apache.doris.nereids.trees.plans.PartitionTopnPhase;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
import java.util.List;
+import java.util.stream.Collectors;
/**
* Implementation rule that convert logical partition-top-n to physical
partition-top-n.
*/
public class LogicalPartitionTopNToPhysicalPartitionTopN extends
OneImplementationRuleFactory {
@Override
public Rule build() {
- return logicalPartitionTopN().then(partitionTopN -> {
- List<OrderKey> orderKeys = !partitionTopN.getOrderKeys().isEmpty()
- ? partitionTopN.getOrderKeys().stream()
- .map(OrderExpression::getOrderKey)
- .collect(ImmutableList.toImmutableList()) :
- ImmutableList.of();
-
- return new PhysicalPartitionTopN<>(
- partitionTopN.getFunction(),
- partitionTopN.getPartitionKeys(),
- orderKeys,
- partitionTopN.hasGlobalLimit(),
- partitionTopN.getPartitionLimit(),
- partitionTopN.getLogicalProperties(),
- partitionTopN.child());
-
}).toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE);
+ return logicalPartitionTopN().thenApplyMulti(ctx ->
twoPhasePartitionTopn(ctx.root))
Review Comment:
if `twoPhasePartitionTopn` only return one candidate, just use `thenApply`
instead of `thenApplyMulti`
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalPartitionTopN.java:
##########
@@ -48,12 +49,13 @@ public class PhysicalPartitionTopN<CHILD_TYPE extends Plan>
extends PhysicalUnar
private final List<OrderKey> orderKeys;
private final Boolean hasGlobalLimit;
private final long partitionLimit;
+ private final PartitionTopnPhase phase;
public PhysicalPartitionTopN(WindowFuncType function, List<Expression>
partitionKeys, List<OrderKey> orderKeys,
Boolean hasGlobalLimit, long partitionLimit,
- LogicalProperties logicalProperties,
CHILD_TYPE child) {
+ LogicalProperties logicalProperties,
PartitionTopnPhase phase, CHILD_TYPE child) {
Review Comment:
`phase` should at the front of `logicalProperties `
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java:
##########
@@ -338,8 +338,15 @@ public PhysicalProperties
visitPhysicalRepeat(PhysicalRepeat<? extends Plan> rep
public PhysicalProperties
visitPhysicalPartitionTopN(PhysicalPartitionTopN<? extends Plan> partitionTopN,
PlanContext context) {
Preconditions.checkState(childrenOutputProperties.size() == 1);
- PhysicalProperties childOutputProperty =
childrenOutputProperties.get(0);
- return new
PhysicalProperties(childOutputProperty.getDistributionSpec());
+ if (partitionTopN.getPhase().isLocal()) {
+ return new PhysicalProperties(
+ childrenOutputProperties.get(0).getDistributionSpec());
+ } else {
+ Preconditions.checkState(partitionTopN.getPhase().isGlobal());
Review Comment:
add check msg
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java:
##########
@@ -21,33 +21,68 @@
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.expressions.OrderExpression;
+import org.apache.doris.nereids.trees.plans.PartitionTopnPhase;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
import java.util.List;
+import java.util.stream.Collectors;
/**
* Implementation rule that convert logical partition-top-n to physical
partition-top-n.
*/
public class LogicalPartitionTopNToPhysicalPartitionTopN extends
OneImplementationRuleFactory {
@Override
public Rule build() {
- return logicalPartitionTopN().then(partitionTopN -> {
- List<OrderKey> orderKeys = !partitionTopN.getOrderKeys().isEmpty()
- ? partitionTopN.getOrderKeys().stream()
- .map(OrderExpression::getOrderKey)
- .collect(ImmutableList.toImmutableList()) :
- ImmutableList.of();
-
- return new PhysicalPartitionTopN<>(
- partitionTopN.getFunction(),
- partitionTopN.getPartitionKeys(),
- orderKeys,
- partitionTopN.hasGlobalLimit(),
- partitionTopN.getPartitionLimit(),
- partitionTopN.getLogicalProperties(),
- partitionTopN.child());
-
}).toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE);
+ return logicalPartitionTopN().thenApplyMulti(ctx ->
twoPhasePartitionTopn(ctx.root))
+
.toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE);
+ }
+
+ private List<PhysicalPartitionTopN<? extends Plan>> twoPhasePartitionTopn(
+ LogicalPartitionTopN<? extends Plan> logicalPartitionTopN) {
+ // for partition topn, the order keys will be set as original
partition keys combined with
+ // orderby keys, to meet upper window operator's order requirement.
+ List<OrderKey> orderKeys = getAllOrderKeys(logicalPartitionTopN);
+
+ PhysicalPartitionTopN<Plan> localPartitionTopN = new
PhysicalPartitionTopN<>(
+ logicalPartitionTopN.getFunction(),
+ logicalPartitionTopN.getPartitionKeys(),
+ orderKeys,
+ logicalPartitionTopN.hasGlobalLimit(),
+ logicalPartitionTopN.getPartitionLimit(),
+ logicalPartitionTopN.getLogicalProperties(),
+ PartitionTopnPhase.LOCAL_PTOPN,
+ logicalPartitionTopN.child(0));
+
+ PhysicalPartitionTopN<Plan> globalPartitionTopN = new
PhysicalPartitionTopN<>(
+ logicalPartitionTopN.getFunction(),
+ logicalPartitionTopN.getPartitionKeys(),
+ orderKeys,
+ logicalPartitionTopN.hasGlobalLimit(),
+ logicalPartitionTopN.getPartitionLimit(),
+ logicalPartitionTopN.getLogicalProperties(),
+ PartitionTopnPhase.GLOBAL_PTOPN,
+ localPartitionTopN);
+
+ return Lists.newArrayList(globalPartitionTopN);
+ }
+
+ private List<OrderKey> getAllOrderKeys(LogicalPartitionTopN<? extends
Plan> logicalPartitionTopN) {
+ List<OrderKey> allOrderKeys = Lists.newArrayList();
+ if (!logicalPartitionTopN.getPartitionKeys().isEmpty()) {
+
allOrderKeys.addAll(logicalPartitionTopN.getPartitionKeys().stream().map(partitionKey
-> {
+ return new OrderKey(partitionKey, true, false);
+ }).collect(Collectors.toList()));
+ }
+ if (!logicalPartitionTopN.getOrderKeys().isEmpty()) {
+ allOrderKeys.addAll(logicalPartitionTopN.getOrderKeys().stream()
+ .map(OrderExpression::getOrderKey)
+ .collect(Collectors.toList())
Review Comment:
```suggestion
.collect(ImmutableList.toImmutableList())
```
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java:
##########
@@ -21,33 +21,68 @@
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.expressions.OrderExpression;
+import org.apache.doris.nereids.trees.plans.PartitionTopnPhase;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
import java.util.List;
+import java.util.stream.Collectors;
/**
* Implementation rule that convert logical partition-top-n to physical
partition-top-n.
*/
public class LogicalPartitionTopNToPhysicalPartitionTopN extends
OneImplementationRuleFactory {
@Override
public Rule build() {
- return logicalPartitionTopN().then(partitionTopN -> {
- List<OrderKey> orderKeys = !partitionTopN.getOrderKeys().isEmpty()
- ? partitionTopN.getOrderKeys().stream()
- .map(OrderExpression::getOrderKey)
- .collect(ImmutableList.toImmutableList()) :
- ImmutableList.of();
-
- return new PhysicalPartitionTopN<>(
- partitionTopN.getFunction(),
- partitionTopN.getPartitionKeys(),
- orderKeys,
- partitionTopN.hasGlobalLimit(),
- partitionTopN.getPartitionLimit(),
- partitionTopN.getLogicalProperties(),
- partitionTopN.child());
-
}).toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE);
+ return logicalPartitionTopN().thenApplyMulti(ctx ->
twoPhasePartitionTopn(ctx.root))
+
.toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE);
+ }
+
+ private List<PhysicalPartitionTopN<? extends Plan>> twoPhasePartitionTopn(
+ LogicalPartitionTopN<? extends Plan> logicalPartitionTopN) {
+ // for partition topn, the order keys will be set as original
partition keys combined with
+ // orderby keys, to meet upper window operator's order requirement.
+ List<OrderKey> orderKeys = getAllOrderKeys(logicalPartitionTopN);
+
+ PhysicalPartitionTopN<Plan> localPartitionTopN = new
PhysicalPartitionTopN<>(
+ logicalPartitionTopN.getFunction(),
+ logicalPartitionTopN.getPartitionKeys(),
+ orderKeys,
+ logicalPartitionTopN.hasGlobalLimit(),
+ logicalPartitionTopN.getPartitionLimit(),
+ logicalPartitionTopN.getLogicalProperties(),
+ PartitionTopnPhase.LOCAL_PTOPN,
+ logicalPartitionTopN.child(0));
+
+ PhysicalPartitionTopN<Plan> globalPartitionTopN = new
PhysicalPartitionTopN<>(
+ logicalPartitionTopN.getFunction(),
+ logicalPartitionTopN.getPartitionKeys(),
+ orderKeys,
+ logicalPartitionTopN.hasGlobalLimit(),
+ logicalPartitionTopN.getPartitionLimit(),
+ logicalPartitionTopN.getLogicalProperties(),
+ PartitionTopnPhase.GLOBAL_PTOPN,
+ localPartitionTopN);
+
+ return Lists.newArrayList(globalPartitionTopN);
+ }
+
+ private List<OrderKey> getAllOrderKeys(LogicalPartitionTopN<? extends
Plan> logicalPartitionTopN) {
+ List<OrderKey> allOrderKeys = Lists.newArrayList();
+ if (!logicalPartitionTopN.getPartitionKeys().isEmpty()) {
+
allOrderKeys.addAll(logicalPartitionTopN.getPartitionKeys().stream().map(partitionKey
-> {
+ return new OrderKey(partitionKey, true, false);
+ }).collect(Collectors.toList()));
Review Comment:
```suggestion
}).collect(ImmutableList.toImmutableList());
```
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalPartitionTopN.java:
##########
@@ -77,14 +80,16 @@ public PhysicalPartitionTopN(WindowFuncType function,
List<Expression> partition
public PhysicalPartitionTopN(WindowFuncType function, List<Expression>
partitionKeys, List<OrderKey> orderKeys,
Boolean hasGlobalLimit, long partitionLimit,
Optional<GroupExpression> groupExpression,
LogicalProperties logicalProperties,
- PhysicalProperties physicalProperties,
Statistics statistics, CHILD_TYPE child) {
+ PhysicalProperties physicalProperties,
Statistics statistics,
+ PartitionTopnPhase phase, CHILD_TYPE child) {
Review Comment:
`phase` should at the front of `groupExpression`
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java:
##########
@@ -21,33 +21,68 @@
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.expressions.OrderExpression;
+import org.apache.doris.nereids.trees.plans.PartitionTopnPhase;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
import java.util.List;
+import java.util.stream.Collectors;
/**
* Implementation rule that convert logical partition-top-n to physical
partition-top-n.
*/
public class LogicalPartitionTopNToPhysicalPartitionTopN extends
OneImplementationRuleFactory {
@Override
public Rule build() {
- return logicalPartitionTopN().then(partitionTopN -> {
- List<OrderKey> orderKeys = !partitionTopN.getOrderKeys().isEmpty()
- ? partitionTopN.getOrderKeys().stream()
- .map(OrderExpression::getOrderKey)
- .collect(ImmutableList.toImmutableList()) :
- ImmutableList.of();
-
- return new PhysicalPartitionTopN<>(
- partitionTopN.getFunction(),
- partitionTopN.getPartitionKeys(),
- orderKeys,
- partitionTopN.hasGlobalLimit(),
- partitionTopN.getPartitionLimit(),
- partitionTopN.getLogicalProperties(),
- partitionTopN.child());
-
}).toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE);
+ return logicalPartitionTopN().thenApplyMulti(ctx ->
twoPhasePartitionTopn(ctx.root))
+
.toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE);
+ }
+
+ private List<PhysicalPartitionTopN<? extends Plan>> twoPhasePartitionTopn(
+ LogicalPartitionTopN<? extends Plan> logicalPartitionTopN) {
+ // for partition topn, the order keys will be set as original
partition keys combined with
+ // orderby keys, to meet upper window operator's order requirement.
+ List<OrderKey> orderKeys = getAllOrderKeys(logicalPartitionTopN);
+
+ PhysicalPartitionTopN<Plan> localPartitionTopN = new
PhysicalPartitionTopN<>(
+ logicalPartitionTopN.getFunction(),
+ logicalPartitionTopN.getPartitionKeys(),
+ orderKeys,
+ logicalPartitionTopN.hasGlobalLimit(),
+ logicalPartitionTopN.getPartitionLimit(),
+ logicalPartitionTopN.getLogicalProperties(),
+ PartitionTopnPhase.LOCAL_PTOPN,
+ logicalPartitionTopN.child(0));
+
+ PhysicalPartitionTopN<Plan> globalPartitionTopN = new
PhysicalPartitionTopN<>(
+ logicalPartitionTopN.getFunction(),
+ logicalPartitionTopN.getPartitionKeys(),
+ orderKeys,
+ logicalPartitionTopN.hasGlobalLimit(),
+ logicalPartitionTopN.getPartitionLimit(),
+ logicalPartitionTopN.getLogicalProperties(),
+ PartitionTopnPhase.GLOBAL_PTOPN,
+ localPartitionTopN);
+
+ return Lists.newArrayList(globalPartitionTopN);
Review Comment:
use ImmutableList.of();
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java:
##########
@@ -21,33 +21,68 @@
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.expressions.OrderExpression;
+import org.apache.doris.nereids.trees.plans.PartitionTopnPhase;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
import java.util.List;
+import java.util.stream.Collectors;
/**
* Implementation rule that convert logical partition-top-n to physical
partition-top-n.
*/
public class LogicalPartitionTopNToPhysicalPartitionTopN extends
OneImplementationRuleFactory {
@Override
public Rule build() {
- return logicalPartitionTopN().then(partitionTopN -> {
- List<OrderKey> orderKeys = !partitionTopN.getOrderKeys().isEmpty()
- ? partitionTopN.getOrderKeys().stream()
- .map(OrderExpression::getOrderKey)
- .collect(ImmutableList.toImmutableList()) :
- ImmutableList.of();
-
- return new PhysicalPartitionTopN<>(
- partitionTopN.getFunction(),
- partitionTopN.getPartitionKeys(),
- orderKeys,
- partitionTopN.hasGlobalLimit(),
- partitionTopN.getPartitionLimit(),
- partitionTopN.getLogicalProperties(),
- partitionTopN.child());
-
}).toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE);
+ return logicalPartitionTopN().thenApplyMulti(ctx ->
twoPhasePartitionTopn(ctx.root))
+
.toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE);
+ }
+
+ private List<PhysicalPartitionTopN<? extends Plan>> twoPhasePartitionTopn(
+ LogicalPartitionTopN<? extends Plan> logicalPartitionTopN) {
+ // for partition topn, the order keys will be set as original
partition keys combined with
+ // orderby keys, to meet upper window operator's order requirement.
+ List<OrderKey> orderKeys = getAllOrderKeys(logicalPartitionTopN);
+
+ PhysicalPartitionTopN<Plan> localPartitionTopN = new
PhysicalPartitionTopN<>(
+ logicalPartitionTopN.getFunction(),
+ logicalPartitionTopN.getPartitionKeys(),
+ orderKeys,
+ logicalPartitionTopN.hasGlobalLimit(),
+ logicalPartitionTopN.getPartitionLimit(),
+ logicalPartitionTopN.getLogicalProperties(),
+ PartitionTopnPhase.LOCAL_PTOPN,
+ logicalPartitionTopN.child(0));
+
+ PhysicalPartitionTopN<Plan> globalPartitionTopN = new
PhysicalPartitionTopN<>(
+ logicalPartitionTopN.getFunction(),
+ logicalPartitionTopN.getPartitionKeys(),
+ orderKeys,
+ logicalPartitionTopN.hasGlobalLimit(),
+ logicalPartitionTopN.getPartitionLimit(),
+ logicalPartitionTopN.getLogicalProperties(),
+ PartitionTopnPhase.GLOBAL_PTOPN,
+ localPartitionTopN);
+
+ return Lists.newArrayList(globalPartitionTopN);
+ }
+
+ private List<OrderKey> getAllOrderKeys(LogicalPartitionTopN<? extends
Plan> logicalPartitionTopN) {
+ List<OrderKey> allOrderKeys = Lists.newArrayList();
Review Comment:
use `ImmutableList.Builder<OrderKey>`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]