This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 4af0b193bad branch-4.1:[fix](agg)Adjust agg strategy when table
satisfy distinct key distribution #61248 (#63453)
4af0b193bad is described below
commit 4af0b193bada879af5ce4bb3f038ae20810755ca
Author: feiniaofeiafei <[email protected]>
AuthorDate: Thu May 21 16:20:15 2026 +0800
branch-4.1:[fix](agg)Adjust agg strategy when table satisfy distinct key
distribution #61248 (#63453)
picked from #61248
---
.../LogicalOlapScanToPhysicalOlapScan.java | 12 +-
.../rules/rewrite/DistinctAggregateRewriter.java | 61 +++---
.../java/org/apache/doris/nereids/util/Utils.java | 16 ++
.../rewrite/DistinctAggregateRewriterTest.java | 130 ++++++++++--
.../nereids_rules_p0/agg_strategy/agg_strategy.out | 226 +++++++++++----------
.../agg_strategy/distinct_agg_rewriter.out | 12 +-
.../distinct_agg_strategy_selector.out | 14 +-
.../agg_strategy/physical_agg_regulator.out | 14 +-
.../distinct_split/disitinct_split.out | 43 ++--
.../agg_strategy/agg_strategy.groovy | 1 -
.../agg_strategy/distinct_agg_rewriter.groovy | 1 -
.../distinct_split/disitinct_split.groovy | 1 +
12 files changed, 348 insertions(+), 183 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java
index 4a7132c3a85..d871b9e3535 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java
@@ -17,13 +17,10 @@
package org.apache.doris.nereids.rules.implementation;
-import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DistributionInfo;
-import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.PartitionType;
import org.apache.doris.nereids.properties.DistributionSpec;
import org.apache.doris.nereids.properties.DistributionSpecHash;
import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType;
@@ -35,6 +32,7 @@ import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.util.Utils;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -76,15 +74,11 @@ public class LogicalOlapScanToPhysicalOlapScan extends
OneImplementationRuleFact
public static DistributionSpec convertDistribution(LogicalOlapScan
olapScan) {
OlapTable olapTable = olapScan.getTable();
DistributionInfo distributionInfo =
olapTable.getDefaultDistributionInfo();
- ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex();
// When there are multiple partitions, olapScan tasks of different
buckets are dispatched in
// rounded robin algorithm. Therefore, the hashDistributedSpec can be
broken except they are in
// the same stable colocateGroup(CG)
- boolean isBelongStableCG =
colocateTableIndex.isColocateTable(olapTable.getId())
- &&
!colocateTableIndex.isGroupUnstable(colocateTableIndex.getGroup(olapTable.getId()))
- && olapTable.getCatalogId() ==
Env.getCurrentInternalCatalog().getId();
- boolean isSelectUnpartition = olapTable.getPartitionInfo().getType()
== PartitionType.UNPARTITIONED
- || olapScan.getSelectedPartitionIds().size() == 1;
+ boolean isBelongStableCG = Utils.isBelongStableCG(olapTable);
+ boolean isSelectUnpartition = Utils.isSelectUnpartition(olapTable,
olapScan.getSelectedPartitionIds());
// TODO: find a better way to handle both tablet num == 1 and colocate
table together in future
if (distributionInfo instanceof HashDistributionInfo &&
(isBelongStableCG || isSelectUnpartition)) {
if (olapScan.getSelectedIndexId() !=
olapScan.getTable().getBaseIndexId()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DistinctAggregateRewriter.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DistinctAggregateRewriter.java
index c2668751dae..832641add2d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DistinctAggregateRewriter.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DistinctAggregateRewriter.java
@@ -24,6 +24,7 @@ import org.apache.doris.catalog.OlapTable;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.rules.rewrite.StatsDerive.DeriveContext;
+import org.apache.doris.nereids.stats.ExpressionEstimation;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
@@ -46,6 +47,7 @@ import org.apache.doris.nereids.util.AggregateUtils;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.Statistics;
import com.google.common.annotations.VisibleForTesting;
@@ -77,7 +79,6 @@ import java.util.Set;
*/
public class DistinctAggregateRewriter implements RewriteRuleFactory {
public static final DistinctAggregateRewriter INSTANCE = new
DistinctAggregateRewriter();
- private static final double MULTI_DISTINCT_GBY_PER_INSTANCE_THRESHOLD =
30.0;
// TODO: add other functions
private static final Set<Class<? extends AggregateFunction>>
supportSplitOtherFunctions = ImmutableSet.of(
Sum.class, Min.class, Max.class, Count.class, Sum0.class,
AnyValue.class);
@@ -118,26 +119,27 @@ public class DistinctAggregateRewriter implements
RewriteRuleFactory {
Statistics aggStats = aggregate.getStats();
Statistics aggChildStats = aggregate.child().getStats();
Set<Expression> dstArgs = aggregate.getDistinctArguments();
+ if (isDistinctKeySatisfyDistribution(aggregate)) {
+ return false;
+ }
// has unknown statistics, split to bottom and top agg
if
(AggregateUtils.hasUnknownStatistics(aggregate.getGroupByExpressions(),
aggChildStats)
|| AggregateUtils.hasUnknownStatistics(dstArgs,
aggChildStats)) {
- return !isDistinctKeySatisfyDistribution(aggregate);
+ return true;
}
double gbyNdv = aggStats.getRowCount();
- int instanceNum = getParallelExecInstanceNum(ConnectContext.get());
- if (instanceNum <= 0) {
- instanceNum = 1;
- }
- return gbyNdv / instanceNum <=
MULTI_DISTINCT_GBY_PER_INSTANCE_THRESHOLD;
- }
-
- private int getParallelExecInstanceNum(ConnectContext ctx) {
- if (ctx == null) {
- return 1;
+ Expression dstKey = dstArgs.iterator().next();
+ ColumnStatistic dstKeyStats =
aggChildStats.findColumnStatistics(dstKey);
+ if (dstKeyStats == null) {
+ dstKeyStats = ExpressionEstimation.estimate(dstKey, aggChildStats);
}
- return ctx.getSessionVariable()
-
.getParallelExecInstanceNum(ctx.getSessionVariable().resolveCloudClusterName(ctx));
+ double dstNdv = dstKeyStats.ndv;
+ double inputRows = aggChildStats.getRowCount();
+ // group by key ndv is low, distinct key ndv is high, multi_distinct
is better
+ // otherwise split to bottom and top agg
+ return gbyNdv < inputRows * AggregateUtils.LOW_CARDINALITY_THRESHOLD
+ && dstNdv > inputRows *
AggregateUtils.HIGH_CARDINALITY_THRESHOLD;
}
private boolean isDistinctKeySatisfyDistribution(LogicalAggregate<?
extends Plan> aggregate) {
@@ -147,12 +149,9 @@ public class DistinctAggregateRewriter implements
RewriteRuleFactory {
}
Set<String> distinctColumnNames = new HashSet<>();
for (SlotReference slot : info.distinctSlots) {
- if (!slot.getOriginalColumn().isPresent()) {
- return false;
- }
-
distinctColumnNames.add(slot.getOriginalColumn().get().getName().toLowerCase());
+ distinctColumnNames.add(slot.getName().toLowerCase());
}
- DistributionInfo distributionInfo =
info.table.getDefaultDistributionInfo();
+ DistributionInfo distributionInfo = info.distributionInfo;
if (!(distributionInfo instanceof HashDistributionInfo)) {
return false;
}
@@ -168,6 +167,15 @@ public class DistinctAggregateRewriter implements
RewriteRuleFactory {
return true;
}
+ /** This function get the DistinctDistributionInfo from aggregate,
+ * and can handle such scenarios: aggregate's child is filter or project,
or both of them:
+ * agg(count(distinct a), group by b)
+ * ->project()
+ * ->filter()
+ * ->scan(distributed by hash(a))
+ * @return Table DistributionInfo and the slots of the base table
+ * referenced by the distinct column of the aggregate function.
+ */
private DistinctDistributionInfo
resolveDistinctDistributionInfo(LogicalAggregate<? extends Plan> aggregate) {
Set<Expression> distinctArgs = aggregate.getDistinctArguments();
if (distinctArgs.isEmpty()) {
@@ -209,25 +217,30 @@ public class DistinctAggregateRewriter implements
RewriteRuleFactory {
if (!(child instanceof LogicalOlapScan)) {
return null;
}
- OlapTable olapTable = ((LogicalOlapScan) child).getTable();
+ LogicalOlapScan scan = (LogicalOlapScan) child;
+ OlapTable olapTable = scan.getTable();
if (olapTable == null) {
return null;
}
+ if (!Utils.isSelectUnpartition(olapTable,
scan.getSelectedPartitionIds())
+ && !Utils.isBelongStableCG(olapTable)) {
+ return null;
+ }
for (SlotReference slot : distinctSlots) {
if (!slot.getOriginalTable().isPresent()
|| slot.getOriginalTable().get() != olapTable) {
return null;
}
}
- return new DistinctDistributionInfo(olapTable, distinctSlots);
+ return new
DistinctDistributionInfo(olapTable.getDefaultDistributionInfo(), distinctSlots);
}
private static class DistinctDistributionInfo {
- private final OlapTable table;
+ private final DistributionInfo distributionInfo;
private final Set<SlotReference> distinctSlots;
- private DistinctDistributionInfo(OlapTable table, Set<SlotReference>
distinctSlots) {
- this.table = table;
+ private DistinctDistributionInfo(DistributionInfo distributionInfo,
Set<SlotReference> distinctSlots) {
+ this.distributionInfo = distributionInfo;
this.distinctSlots = distinctSlots;
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java
index 34e91af8375..66eb03cbaa4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java
@@ -17,6 +17,10 @@
package org.apache.doris.nereids.util;
+import org.apache.doris.catalog.ColocateTableIndex;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.PartitionType;
import org.apache.doris.info.TableNameInfo;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
@@ -176,6 +180,18 @@ public class Utils {
return StringUtils.join(qualifierWithBackquote, ".");
}
+ public static boolean isBelongStableCG(OlapTable olapTable) {
+ ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex();
+ return colocateTableIndex.isColocateTable(olapTable.getId())
+ &&
!colocateTableIndex.isGroupUnstable(colocateTableIndex.getGroup(olapTable.getId()))
+ && olapTable.getCatalogId() ==
Env.getCurrentInternalCatalog().getId();
+ }
+
+ public static boolean isSelectUnpartition(OlapTable olapTable,
Collection<Long> selectedPartitionIds) {
+ return olapTable.getPartitionInfo().getType() ==
PartitionType.UNPARTITIONED
+ || selectedPartitionIds.size() == 1;
+ }
+
/**
* Get sql string for plan.
*
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/DistinctAggregateRewriterTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/DistinctAggregateRewriterTest.java
index 05c2180840c..7b9e957b742 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/DistinctAggregateRewriterTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/DistinctAggregateRewriterTest.java
@@ -17,7 +17,10 @@
package org.apache.doris.nereids.rules.rewrite;
+import org.apache.doris.catalog.DistributionInfo;
+import org.apache.doris.catalog.HashDistributionInfo;
import
org.apache.doris.nereids.rules.analysis.LogicalSubQueryAliasToLogicalProject;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
import
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
import org.apache.doris.nereids.trees.expressions.functions.agg.Count;
import
org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinctCount;
@@ -35,15 +38,22 @@ import org.apache.doris.statistics.ColumnStatisticBuilder;
import org.apache.doris.statistics.Statistics;
import org.apache.doris.utframe.TestWithFeService;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import mockit.Mock;
import mockit.MockUp;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
public class DistinctAggregateRewriterTest extends TestWithFeService
implements MemoPatternMatchSupported {
@Override
@@ -52,16 +62,33 @@ public class DistinctAggregateRewriterTest extends
TestWithFeService implements
createTable("create table test.distinct_agg_split_t(a int null, b int
not null,"
+ "c varchar(10) null, d date, dt datetime)\n"
+ "distributed by hash(a) properties('replication_num' =
'1');");
+ createTable("CREATE TABLE IF NOT EXISTS test.sales_records\n"
+ + "(\n"
+ + " record_id BIGINT,\n"
+ + " seller_id BIGINT,\n"
+ + " sale_date DATE,\n"
+ + " amount DECIMAL(18,2)\n"
+ + ")\n"
+ + "DUPLICATE KEY(record_id, seller_id)\n"
+ + "PARTITION BY RANGE(sale_date)\n"
+ + "(\n"
+ + " PARTITION p202301 VALUES LESS THAN ('2023-02-01'),\n"
+ + " PARTITION p202302 VALUES LESS THAN ('2023-03-01'),\n"
+ + " PARTITION p202303 VALUES LESS THAN ('2023-04-01')\n"
+ + ")\n"
+ + "DISTRIBUTED BY HASH(record_id) BUCKETS 10\n"
+ + "PROPERTIES (\n"
+ + " \"replication_num\" = \"1\"\n"
+ + ");");
+ createTable("create table test.distinct_agg_hash_ab(a int null, b int
not null, c int null, d int null)\n"
+ + "distributed by hash(a, b) properties('replication_num' =
'1');");
+ createTable("create table test.distinct_agg_hash_abcd(a int null, b
int not null, c int null, d int null)\n"
+ + "distributed by hash(a, b, c, d)
properties('replication_num' = '1');");
connectContext.setDatabase("test");
+ SessionVariable spySessionVariable =
Mockito.spy(connectContext.getSessionVariable());
+
Mockito.doReturn(24).when(spySessionVariable).getParallelExecInstanceNum(Mockito.anyString());
+ connectContext.setSessionVariable(spySessionVariable);
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
- new SessionVariableMockUp();
- }
-
- private static class SessionVariableMockUp extends MockUp<SessionVariable>
{
- @Mock
- private int getParallelExecInstanceNum(String clusterName) {
- return 24;
- }
}
private void applyMock() {
@@ -256,25 +283,67 @@ public class DistinctAggregateRewriterTest extends
TestWithFeService implements
((AbstractPlan) child).setStatistics(new Statistics(100000, colStats));
aggregate.setStatistics(new Statistics(240, ImmutableMap.of()));
- Assertions.assertTrue(rewriter.shouldUseMultiDistinct(aggregate));
+ Assertions.assertFalse(rewriter.shouldUseMultiDistinct(aggregate));
}
@Test
- void testShouldUseMultiDistinctWithStatsNotSelected() throws Exception {
- DistinctAggregateRewriter rewriter = new DistinctAggregateRewriter();
+ void testShouldUseMultiDistinctWithPartitionTable() {
+ DistinctAggregateRewriter rewriter =
DistinctAggregateRewriter.INSTANCE;
LogicalAggregate<? extends Plan> aggregate = getLogicalAggregate(
- "select b, count(distinct a) from test.distinct_agg_split_t
group by b"
+ "select count(distinct record_id) from sales_records group by
sale_date;"
);
Plan child = aggregate.child();
Map<org.apache.doris.nereids.trees.expressions.Expression,
ColumnStatistic> colStats = new HashMap<>();
aggregate.getGroupByExpressions().forEach(expr ->
- colStats.put(expr, buildColumnStats(1000.0, false)));
+ colStats.put(expr, unknownColumnStats()));
aggregate.getDistinctArguments().forEach(expr ->
- colStats.put(expr, buildColumnStats(10000.0, false)));
- ((AbstractPlan) child).setStatistics(new Statistics(100000, colStats));
- aggregate.setStatistics(new Statistics(1000.0, ImmutableMap.of()));
+ colStats.put(expr, unknownColumnStats()));
+ ((AbstractPlan) child).setStatistics(new Statistics(10000, colStats));
+ aggregate.setStatistics(new Statistics(100, ImmutableMap.of()));
- Assertions.assertFalse(rewriter.shouldUseMultiDistinct(aggregate));
+ Assertions.assertTrue(rewriter.shouldUseMultiDistinct(aggregate));
+ }
+
+ @Test
+ void testResolveDistinctDistributionInfoWithProjectAndFilter() throws
Exception {
+ LogicalAggregate<? extends Plan> aggregate = getLogicalAggregate(
+ "select bb, count(distinct aa) from "
+ + "(select a as aa, b as bb from
test.distinct_agg_hash_ab where c > 1) t "
+ + "group by bb"
+ );
+
+ Object info = invokeResolveDistinctDistributionInfo(aggregate);
+ Assertions.assertNotNull(info);
+
+ List<String> distinctSlotNames = getDistinctSlots(info).stream()
+ .map(SlotReference::getName)
+ .collect(Collectors.toList());
+ Assertions.assertEquals(ImmutableList.of("a"), distinctSlotNames);
+
+ DistributionInfo distributionInfo = getDistributionInfo(info);
+ Assertions.assertTrue(distributionInfo instanceof
HashDistributionInfo);
+ List<String> distributionColumnNames = ((HashDistributionInfo)
distributionInfo).getDistributionColumns().stream()
+ .map(column -> column.getName().toLowerCase())
+ .collect(Collectors.toList());
+ Assertions.assertEquals(ImmutableList.of("a", "b"),
distributionColumnNames);
+ }
+
+ @Test
+ void
testIsDistinctKeySatisfyDistributionWhenDistinctContainsDistributionColumns()
throws Exception {
+ LogicalAggregate<? extends Plan> aggregate = getLogicalAggregate(
+ "select d, count(distinct a, b, c) from
test.distinct_agg_hash_ab group by d"
+ );
+
+
Assertions.assertTrue(invokeIsDistinctKeySatisfyDistribution(aggregate));
+ }
+
+ @Test
+ void testIsDistinctKeySatisfyDistributionWhenDistributionHasExtraColumns()
throws Exception {
+ LogicalAggregate<? extends Plan> aggregate = getLogicalAggregate(
+ "select d, count(distinct a, b, c) from
test.distinct_agg_hash_abcd group by d"
+ );
+
+
Assertions.assertFalse(invokeIsDistinctKeySatisfyDistribution(aggregate));
}
private LogicalAggregate<? extends Plan> getLogicalAggregate(String sql) {
@@ -300,6 +369,33 @@ public class DistinctAggregateRewriterTest extends
TestWithFeService implements
return Optional.empty();
}
+ private Object invokeResolveDistinctDistributionInfo(LogicalAggregate<?
extends Plan> aggregate) throws Exception {
+ Method method = DistinctAggregateRewriter.class.getDeclaredMethod(
+ "resolveDistinctDistributionInfo", LogicalAggregate.class);
+ method.setAccessible(true);
+ return method.invoke(DistinctAggregateRewriter.INSTANCE, aggregate);
+ }
+
+ private boolean invokeIsDistinctKeySatisfyDistribution(LogicalAggregate<?
extends Plan> aggregate) throws Exception {
+ Method method = DistinctAggregateRewriter.class.getDeclaredMethod(
+ "isDistinctKeySatisfyDistribution", LogicalAggregate.class);
+ method.setAccessible(true);
+ return (boolean) method.invoke(DistinctAggregateRewriter.INSTANCE,
aggregate);
+ }
+
+ @SuppressWarnings("unchecked")
+ private Set<SlotReference> getDistinctSlots(Object info) throws Exception {
+ Field field = info.getClass().getDeclaredField("distinctSlots");
+ field.setAccessible(true);
+ return (Set<SlotReference>) field.get(info);
+ }
+
+ private DistributionInfo getDistributionInfo(Object info) throws Exception
{
+ Field field = info.getClass().getDeclaredField("distributionInfo");
+ field.setAccessible(true);
+ return (DistributionInfo) field.get(info);
+ }
+
private ColumnStatistic unknownColumnStats() {
return buildColumnStats(0.0, true);
}
diff --git
a/regression-test/data/nereids_rules_p0/agg_strategy/agg_strategy.out
b/regression-test/data/nereids_rules_p0/agg_strategy/agg_strategy.out
index a718649a385..3deb2ff71d4 100644
--- a/regression-test/data/nereids_rules_p0/agg_strategy/agg_strategy.out
+++ b/regression-test/data/nereids_rules_p0/agg_strategy/agg_strategy.out
@@ -60,40 +60,40 @@
1
-- !agg_distinct_with_gby_key_with_other_func --
-1 1 27 27
-1 2 76 76
-1 3 42 42
-1 4 64 64
-1 5 18 18
-1 6 91 91
-1 7 13 13
-1 8 33 33
-1 9 55 55
-1 10 100 100
+1 1 27 27.0
+1 2 76 76.0
+1 3 42 42.0
+1 4 64 64.0
+1 5 18 18.0
+1 6 91 91.0
+1 7 13 13.0
+1 8 33 33.0
+1 9 55 55.0
+1 10 100 100.0
-- !agg_distinct_satisfy_gby_key_with_other_func --
-1 1 42 42
-1 2 18 18
-1 3 76 76
-1 4 33 33
-1 5 91 91
-1 6 27 27
-1 7 64 64
-1 8 55 55
-1 9 13 13
-1 10 100 100
+1 1 42 42.0
+1 2 18 18.0
+1 3 76 76.0
+1 4 33 33.0
+1 5 91 91.0
+1 6 27 27.0
+1 7 64 64.0
+1 8 55 55.0
+1 9 13 13.0
+1 10 100 100.0
-- !agg_distinct_satisfy_dst_key_with_other_func --
-1 13 13
-1 18 18
-1 27 27
-1 33 33
-1 42 42
-1 55 55
-1 64 64
-1 76 76
-1 91 91
-1 100 100
+1 13 13.0
+1 18 18.0
+1 27 27.0
+1 33 33.0
+1 42 42.0
+1 55 55.0
+1 64 64.0
+1 76 76.0
+1 91 91.0
+1 100 100.0
-- !agg_distinct_without_gby_key --
10
@@ -281,40 +281,40 @@ PhysicalResultSink
1
-- !agg_distinct_with_gby_key_with_other_func --
-1 1 27 27
-1 2 76 76
-1 3 42 42
-1 4 64 64
-1 5 18 18
-1 6 91 91
-1 7 13 13
-1 8 33 33
-1 9 55 55
-1 10 100 100
+1 1 27 27.0
+1 2 76 76.0
+1 3 42 42.0
+1 4 64 64.0
+1 5 18 18.0
+1 6 91 91.0
+1 7 13 13.0
+1 8 33 33.0
+1 9 55 55.0
+1 10 100 100.0
-- !agg_distinct_satisfy_gby_key_with_other_func --
-1 1 42 42
-1 2 18 18
-1 3 76 76
-1 4 33 33
-1 5 91 91
-1 6 27 27
-1 7 64 64
-1 8 55 55
-1 9 13 13
-1 10 100 100
+1 1 42 42.0
+1 2 18 18.0
+1 3 76 76.0
+1 4 33 33.0
+1 5 91 91.0
+1 6 27 27.0
+1 7 64 64.0
+1 8 55 55.0
+1 9 13 13.0
+1 10 100 100.0
-- !agg_distinct_satisfy_dst_key_with_other_func --
-1 13 13
-1 18 18
-1 27 27
-1 33 33
-1 42 42
-1 55 55
-1 64 64
-1 76 76
-1 91 91
-1 100 100
+1 13 13.0
+1 18 18.0
+1 27 27.0
+1 33 33.0
+1 42 42.0
+1 55 55.0
+1 64 64.0
+1 76 76.0
+1 91 91.0
+1 100 100.0
-- !agg_distinct_without_gby_key --
10
@@ -356,7 +356,10 @@ PhysicalResultSink
--------hashAgg[GLOBAL]
----------PhysicalDistribute[DistributionSpecHash]
------------hashAgg[LOCAL]
---------------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id]
+--------------hashAgg[GLOBAL]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashAgg[LOCAL]
+--------------------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id]
-- !agg_distinct_satisfy_gby_key --
PhysicalResultSink
@@ -364,7 +367,8 @@ PhysicalResultSink
----PhysicalDistribute[DistributionSpecGather]
------PhysicalQuickSort[LOCAL_SORT]
--------hashAgg[GLOBAL]
-----------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id]
+----------hashAgg[GLOBAL]
+------------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id]
-- !agg_distinct_satisfy_dst_key --
PhysicalResultSink
@@ -374,35 +378,41 @@ PhysicalResultSink
--------hashAgg[GLOBAL]
----------PhysicalDistribute[DistributionSpecHash]
------------hashAgg[LOCAL]
---------------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id]
+--------------hashAgg[GLOBAL]
+----------------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id]
-- !agg_distinct_with_gby_key_with_other_func --
PhysicalResultSink
--PhysicalQuickSort[MERGE_SORT]
----PhysicalDistribute[DistributionSpecGather]
------PhysicalQuickSort[LOCAL_SORT]
---------hashAgg[GLOBAL]
+--------hashAgg[DISTINCT_GLOBAL]
----------PhysicalDistribute[DistributionSpecHash]
-------------hashAgg[LOCAL]
---------------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id]
+------------hashAgg[DISTINCT_LOCAL]
+--------------hashAgg[GLOBAL]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashAgg[LOCAL]
+--------------------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id]
-- !agg_distinct_satisfy_gby_key_with_other_func --
PhysicalResultSink
--PhysicalQuickSort[MERGE_SORT]
----PhysicalDistribute[DistributionSpecGather]
------PhysicalQuickSort[LOCAL_SORT]
---------hashAgg[GLOBAL]
-----------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id]
+--------hashAgg[DISTINCT_GLOBAL]
+----------hashAgg[GLOBAL]
+------------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id]
-- !agg_distinct_satisfy_dst_key_with_other_func --
PhysicalResultSink
--PhysicalQuickSort[MERGE_SORT]
----PhysicalDistribute[DistributionSpecGather]
------PhysicalQuickSort[LOCAL_SORT]
---------hashAgg[GLOBAL]
+--------hashAgg[DISTINCT_GLOBAL]
----------PhysicalDistribute[DistributionSpecHash]
-------------hashAgg[LOCAL]
---------------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id]
+------------hashAgg[DISTINCT_LOCAL]
+--------------hashAgg[GLOBAL]
+----------------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id]
-- !agg_distinct_without_gby_key --
PhysicalResultSink
@@ -484,16 +494,16 @@ PhysicalResultSink
2
-- !agg_distinct_with_gby_key_with_other_func_low_ndv --
-2 0 60 4
-2 1 75 5
+2 0 60 4.0
+2 1 75 5.0
-- !agg_distinct_satisfy_gby_key_with_other_func_low_ndv --
2 0 69 4.3125
2 1 66 4.714285714285714
-- !agg_distinct_satisfy_dst_key_with_other_func_low_ndv --
-2 60 4
-2 75 5
+2 60 4.0
+2 75 5.0
-- !agg_distinct_without_gby_key_low_ndv --
2
@@ -580,9 +590,9 @@ PhysicalResultSink
----PhysicalDistribute[DistributionSpecGather]
------PhysicalQuickSort[LOCAL_SORT]
--------hashAgg[DISTINCT_GLOBAL]
-----------PhysicalDistribute[DistributionSpecHash]
-------------hashAgg[DISTINCT_LOCAL]
---------------hashAgg[GLOBAL]
+----------hashAgg[GLOBAL]
+------------PhysicalDistribute[DistributionSpecHash]
+--------------hashAgg[LOCAL]
----------------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id]
-- !agg_distinct_without_gby_key_low_ndv --
@@ -641,16 +651,16 @@ PhysicalResultSink
2
-- !agg_distinct_with_gby_key_with_other_func_low_ndv --
-2 0 60 4
-2 1 75 5
+2 0 60 4.0
+2 1 75 5.0
-- !agg_distinct_satisfy_gby_key_with_other_func_low_ndv --
2 0 69 4.3125
2 1 66 4.714285714285714
-- !agg_distinct_satisfy_dst_key_with_other_func_low_ndv --
-2 60 4
-2 75 5
+2 60 4.0
+2 75 5.0
-- !agg_distinct_without_gby_key_low_ndv --
2
@@ -692,7 +702,10 @@ PhysicalResultSink
--------hashAgg[GLOBAL]
----------PhysicalDistribute[DistributionSpecHash]
------------hashAgg[LOCAL]
---------------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id]
+--------------hashAgg[GLOBAL]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashAgg[LOCAL]
+--------------------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id]
-- !agg_distinct_satisfy_gby_key_low_ndv --
PhysicalResultSink
@@ -700,7 +713,8 @@ PhysicalResultSink
----PhysicalDistribute[DistributionSpecGather]
------PhysicalQuickSort[LOCAL_SORT]
--------hashAgg[GLOBAL]
-----------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id]
+----------hashAgg[GLOBAL]
+------------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id]
-- !agg_distinct_satisfy_dst_key_low_ndv --
PhysicalResultSink
@@ -710,35 +724,41 @@ PhysicalResultSink
--------hashAgg[GLOBAL]
----------PhysicalDistribute[DistributionSpecHash]
------------hashAgg[LOCAL]
---------------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id]
+--------------hashAgg[GLOBAL]
+----------------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id]
-- !agg_distinct_with_gby_key_with_other_func_low_ndv --
PhysicalResultSink
--PhysicalQuickSort[MERGE_SORT]
----PhysicalDistribute[DistributionSpecGather]
------PhysicalQuickSort[LOCAL_SORT]
---------hashAgg[GLOBAL]
+--------hashAgg[DISTINCT_GLOBAL]
----------PhysicalDistribute[DistributionSpecHash]
-------------hashAgg[LOCAL]
---------------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id]
+------------hashAgg[DISTINCT_LOCAL]
+--------------hashAgg[GLOBAL]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashAgg[LOCAL]
+--------------------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id]
-- !agg_distinct_satisfy_gby_key_with_other_func_low_ndv --
PhysicalResultSink
--PhysicalQuickSort[MERGE_SORT]
----PhysicalDistribute[DistributionSpecGather]
------PhysicalQuickSort[LOCAL_SORT]
---------hashAgg[GLOBAL]
-----------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id]
+--------hashAgg[DISTINCT_GLOBAL]
+----------hashAgg[GLOBAL]
+------------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id]
-- !agg_distinct_satisfy_dst_key_with_other_func_low_ndv --
PhysicalResultSink
--PhysicalQuickSort[MERGE_SORT]
----PhysicalDistribute[DistributionSpecGather]
------PhysicalQuickSort[LOCAL_SORT]
---------hashAgg[GLOBAL]
+--------hashAgg[DISTINCT_GLOBAL]
----------PhysicalDistribute[DistributionSpecHash]
-------------hashAgg[LOCAL]
---------------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id]
+------------hashAgg[DISTINCT_LOCAL]
+--------------hashAgg[GLOBAL]
+----------------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id]
-- !agg_distinct_without_gby_key_low_ndv --
PhysicalResultSink
@@ -788,16 +808,16 @@ PhysicalResultSink
1
-- !with_gby_split_in_cascades --
-1 13
-1 18
-1 27
-1 33
-1 42
-1 55
-1 64
-1 76
-1 91
-1 100
+1 13.0
+1 18.0
+1 27.0
+1 33.0
+1 42.0
+1 55.0
+1 64.0
+1 76.0
+1 91.0
+1 100.0
-- !without_gby --
10
diff --git
a/regression-test/data/nereids_rules_p0/agg_strategy/distinct_agg_rewriter.out
b/regression-test/data/nereids_rules_p0/agg_strategy/distinct_agg_rewriter.out
index b386b2f510a..18088719ce3 100644
---
a/regression-test/data/nereids_rules_p0/agg_strategy/distinct_agg_rewriter.out
+++
b/regression-test/data/nereids_rules_p0/agg_strategy/distinct_agg_rewriter.out
@@ -14,9 +14,10 @@ PhysicalResultSink
PhysicalResultSink
--PhysicalDistribute[DistributionSpecGather]
----hashAgg[GLOBAL]
-------PhysicalDistribute[DistributionSpecHash]
---------hashAgg[LOCAL]
-----------PhysicalOlapScan[t1000_2]
+------hashAgg[GLOBAL]
+--------PhysicalDistribute[DistributionSpecHash]
+----------hashAgg[LOCAL]
+------------PhysicalOlapScan[t1000_2]
-- !use_multi_phase3 --
PhysicalResultSink
@@ -24,7 +25,10 @@ PhysicalResultSink
----hashAgg[GLOBAL]
------PhysicalDistribute[DistributionSpecHash]
--------hashAgg[LOCAL]
-----------PhysicalOlapScan[t1000_2]
+----------hashAgg[GLOBAL]
+------------PhysicalDistribute[DistributionSpecHash]
+--------------hashAgg[LOCAL]
+----------------PhysicalOlapScan[t1000_2]
-- !use_multi_distinct --
PhysicalResultSink
diff --git
a/regression-test/data/nereids_rules_p0/agg_strategy/distinct_agg_strategy_selector.out
b/regression-test/data/nereids_rules_p0/agg_strategy/distinct_agg_strategy_selector.out
index 7c37f7f73b2..05ef347e89b 100644
---
a/regression-test/data/nereids_rules_p0/agg_strategy/distinct_agg_strategy_selector.out
+++
b/regression-test/data/nereids_rules_p0/agg_strategy/distinct_agg_strategy_selector.out
@@ -57,13 +57,19 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
--------hashAgg[GLOBAL]
----------PhysicalDistribute[DistributionSpecHash]
------------hashAgg[LOCAL]
---------------PhysicalDistribute[DistributionSpecExecutionAny]
-----------------PhysicalCteConsumer ( cteId=CTEId#0 )
+--------------hashAgg[GLOBAL]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashAgg[LOCAL]
+--------------------PhysicalDistribute[DistributionSpecExecutionAny]
+----------------------PhysicalCteConsumer ( cteId=CTEId#0 )
--------hashAgg[GLOBAL]
----------PhysicalDistribute[DistributionSpecHash]
------------hashAgg[LOCAL]
---------------PhysicalDistribute[DistributionSpecExecutionAny]
-----------------PhysicalCteConsumer ( cteId=CTEId#0 )
+--------------hashAgg[GLOBAL]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashAgg[LOCAL]
+--------------------PhysicalDistribute[DistributionSpecExecutionAny]
+----------------------PhysicalCteConsumer ( cteId=CTEId#0 )
-- !should_use_multi_distinct_with_group_by --
PhysicalResultSink
diff --git
a/regression-test/data/nereids_rules_p0/agg_strategy/physical_agg_regulator.out
b/regression-test/data/nereids_rules_p0/agg_strategy/physical_agg_regulator.out
index 8a6d35b0094..a33a83f01e0 100644
---
a/regression-test/data/nereids_rules_p0/agg_strategy/physical_agg_regulator.out
+++
b/regression-test/data/nereids_rules_p0/agg_strategy/physical_agg_regulator.out
@@ -50,7 +50,10 @@ PhysicalResultSink
----hashAgg[GLOBAL]
------PhysicalDistribute[DistributionSpecHash]
--------hashAgg[LOCAL]
-----------PhysicalOlapScan[t1025]
+----------hashAgg[GLOBAL]
+------------PhysicalDistribute[DistributionSpecHash]
+--------------hashAgg[LOCAL]
+----------------PhysicalOlapScan[t1025]
-- !split_multi_agg_use_three_phase --
PhysicalResultSink
@@ -64,8 +67,11 @@ PhysicalResultSink
-- !split_multi_agg_use_four_phase --
PhysicalResultSink
--PhysicalDistribute[DistributionSpecGather]
-----hashAgg[GLOBAL]
+----hashAgg[DISTINCT_GLOBAL]
------PhysicalDistribute[DistributionSpecHash]
---------hashAgg[LOCAL]
-----------PhysicalOlapScan[t1025]
+--------hashAgg[DISTINCT_LOCAL]
+----------hashAgg[GLOBAL]
+------------PhysicalDistribute[DistributionSpecHash]
+--------------hashAgg[LOCAL]
+----------------PhysicalOlapScan[t1025]
diff --git
a/regression-test/data/nereids_rules_p0/distinct_split/disitinct_split.out
b/regression-test/data/nereids_rules_p0/distinct_split/disitinct_split.out
index 1975968f4de..735f924821a 100644
--- a/regression-test/data/nereids_rules_p0/distinct_split/disitinct_split.out
+++ b/regression-test/data/nereids_rules_p0/distinct_split/disitinct_split.out
@@ -200,18 +200,18 @@
2 2
-- !00_avg --
-2
+2.0
-- !10_avg --
-2 1.5
+2.0 1.5
-- !01_avg --
-2
-2
+2.0
+2.0
-- !11_avg --
-2 1
-2 2
+2.0 1.0
+2.0 2.0
-- !count_sum_avg_no_gby --
2 2 3.5
@@ -221,10 +221,10 @@
-- !count_sum_avg_with_gby --
2 1 3.5
-2 1 4
+2 1 4.0
-- !count_multi_sum_avg_with_gby --
-2 2 4
+2 2 4.0
2 3 3.5
-- !multi_sum_has_upper --
@@ -494,10 +494,14 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
----hashJoin[INNER_JOIN] hashCondition=((c <=> .c)) otherCondition=()
------hashAgg[GLOBAL]
--------hashAgg[LOCAL]
-----------PhysicalCteConsumer ( cteId=CTEId#0 )
+----------hashAgg[GLOBAL]
+------------hashAgg[LOCAL]
+--------------PhysicalCteConsumer ( cteId=CTEId#0 )
------hashAgg[GLOBAL]
--------hashAgg[LOCAL]
-----------PhysicalCteConsumer ( cteId=CTEId#0 )
+----------hashAgg[GLOBAL]
+------------hashAgg[LOCAL]
+--------------PhysicalCteConsumer ( cteId=CTEId#0 )
-- !multi_sum_with_gby --
PhysicalCteAnchor ( cteId=CTEId#0 )
@@ -507,10 +511,14 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
----hashJoin[INNER_JOIN] hashCondition=((c <=> .c)) otherCondition=()
------hashAgg[GLOBAL]
--------hashAgg[LOCAL]
-----------PhysicalCteConsumer ( cteId=CTEId#0 )
+----------hashAgg[GLOBAL]
+------------hashAgg[LOCAL]
+--------------PhysicalCteConsumer ( cteId=CTEId#0 )
------hashAgg[GLOBAL]
--------hashAgg[LOCAL]
-----------PhysicalCteConsumer ( cteId=CTEId#0 )
+----------hashAgg[GLOBAL]
+------------hashAgg[LOCAL]
+--------------PhysicalCteConsumer ( cteId=CTEId#0 )
-- !sum_count_with_gby --
PhysicalCteAnchor ( cteId=CTEId#0 )
@@ -520,10 +528,13 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
----hashJoin[INNER_JOIN] hashCondition=((a <=> .a)) otherCondition=()
------hashAgg[GLOBAL]
--------hashAgg[LOCAL]
-----------PhysicalCteConsumer ( cteId=CTEId#0 )
+----------hashAgg[GLOBAL]
+------------hashAgg[LOCAL]
+--------------PhysicalCteConsumer ( cteId=CTEId#0 )
------hashAgg[GLOBAL]
---------hashAgg[LOCAL]
-----------PhysicalCteConsumer ( cteId=CTEId#0 )
+--------hashAgg[GLOBAL]
+----------hashAgg[LOCAL]
+------------PhysicalCteConsumer ( cteId=CTEId#0 )
-- !has_grouping --
PhysicalResultSink
@@ -533,7 +544,7 @@ PhysicalResultSink
--------PhysicalOlapScan[test_distinct_multi]
-- !null_hash --
-1 \N 0 0
+1 \N 0 0.0
-- !same_distinct_arg --
2 1
diff --git
a/regression-test/suites/nereids_rules_p0/agg_strategy/agg_strategy.groovy
b/regression-test/suites/nereids_rules_p0/agg_strategy/agg_strategy.groovy
index a5c136d074c..72ee1b92efb 100644
--- a/regression-test/suites/nereids_rules_p0/agg_strategy/agg_strategy.groovy
+++ b/regression-test/suites/nereids_rules_p0/agg_strategy/agg_strategy.groovy
@@ -21,7 +21,6 @@ suite("agg_strategy") {
sql "set global enable_auto_analyze=false"
sql "set runtime_filter_mode=OFF"
sql "set be_number_for_test=1;"
- sql "set parallel_pipeline_task_num=1;"
for (int i = 0; i < 2; i++) {
if (i == 0) {
diff --git
a/regression-test/suites/nereids_rules_p0/agg_strategy/distinct_agg_rewriter.groovy
b/regression-test/suites/nereids_rules_p0/agg_strategy/distinct_agg_rewriter.groovy
index 476fa559ba6..953c233c0d5 100644
---
a/regression-test/suites/nereids_rules_p0/agg_strategy/distinct_agg_rewriter.groovy
+++
b/regression-test/suites/nereids_rules_p0/agg_strategy/distinct_agg_rewriter.groovy
@@ -21,7 +21,6 @@ suite("distinct_agg_rewriter") {
set runtime_filter_mode=OFF;
set enable_parallel_result_sink=false;
set be_number_for_test=1;
- set parallel_pipeline_task_num=1;
"""
multi_sql """
analyze table t1000_2 with sync;
diff --git
a/regression-test/suites/nereids_rules_p0/distinct_split/disitinct_split.groovy
b/regression-test/suites/nereids_rules_p0/distinct_split/disitinct_split.groovy
index f9da6636064..883f0529132 100644
---
a/regression-test/suites/nereids_rules_p0/distinct_split/disitinct_split.groovy
+++
b/regression-test/suites/nereids_rules_p0/distinct_split/disitinct_split.groovy
@@ -20,6 +20,7 @@ suite("distinct_split") {
sql "set disable_join_reorder=true"
sql "set global enable_auto_analyze=false;"
sql "set be_number_for_test=1;"
+ sql "set parallel_pipeline_task_num=1;"
sql "drop table if exists test_distinct_multi"
sql "create table test_distinct_multi(a int, b int, c int, d varchar(10),
e date) distributed by hash(a) properties('replication_num'='1');"
sql "insert into test_distinct_multi
values(1,2,3,'abc','2024-01-02'),(1,2,4,'abc','2024-01-03'),(2,2,4,'abcd','2024-01-02'),(1,2,3,'abcd','2024-01-04'),(1,2,4,'eee','2024-02-02'),(2,2,4,'abc','2024-01-02');"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]