This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 4af0b193bad branch-4.1:[fix](agg)Adjust agg strategy when table 
satisfy distinct key distribution #61248 (#63453)
4af0b193bad is described below

commit 4af0b193bada879af5ce4bb3f038ae20810755ca
Author: feiniaofeiafei <[email protected]>
AuthorDate: Thu May 21 16:20:15 2026 +0800

    branch-4.1:[fix](agg)Adjust agg strategy when table satisfy distinct key 
distribution #61248 (#63453)
    
    picked from #61248
---
 .../LogicalOlapScanToPhysicalOlapScan.java         |  12 +-
 .../rules/rewrite/DistinctAggregateRewriter.java   |  61 +++---
 .../java/org/apache/doris/nereids/util/Utils.java  |  16 ++
 .../rewrite/DistinctAggregateRewriterTest.java     | 130 ++++++++++--
 .../nereids_rules_p0/agg_strategy/agg_strategy.out | 226 +++++++++++----------
 .../agg_strategy/distinct_agg_rewriter.out         |  12 +-
 .../distinct_agg_strategy_selector.out             |  14 +-
 .../agg_strategy/physical_agg_regulator.out        |  14 +-
 .../distinct_split/disitinct_split.out             |  43 ++--
 .../agg_strategy/agg_strategy.groovy               |   1 -
 .../agg_strategy/distinct_agg_rewriter.groovy      |   1 -
 .../distinct_split/disitinct_split.groovy          |   1 +
 12 files changed, 348 insertions(+), 183 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java
index 4a7132c3a85..d871b9e3535 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java
@@ -17,13 +17,10 @@
 
 package org.apache.doris.nereids.rules.implementation;
 
-import org.apache.doris.catalog.ColocateTableIndex;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.DistributionInfo;
-import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.HashDistributionInfo;
 import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.PartitionType;
 import org.apache.doris.nereids.properties.DistributionSpec;
 import org.apache.doris.nereids.properties.DistributionSpecHash;
 import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType;
@@ -35,6 +32,7 @@ import org.apache.doris.nereids.trees.expressions.Slot;
 import org.apache.doris.nereids.trees.expressions.SlotReference;
 import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.util.Utils;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -76,15 +74,11 @@ public class LogicalOlapScanToPhysicalOlapScan extends 
OneImplementationRuleFact
     public static DistributionSpec convertDistribution(LogicalOlapScan 
olapScan) {
         OlapTable olapTable = olapScan.getTable();
         DistributionInfo distributionInfo = 
olapTable.getDefaultDistributionInfo();
-        ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex();
         // When there are multiple partitions, olapScan tasks of different 
buckets are dispatched in
         // rounded robin algorithm. Therefore, the hashDistributedSpec can be 
broken except they are in
         // the same stable colocateGroup(CG)
-        boolean isBelongStableCG = 
colocateTableIndex.isColocateTable(olapTable.getId())
-                && 
!colocateTableIndex.isGroupUnstable(colocateTableIndex.getGroup(olapTable.getId()))
-                && olapTable.getCatalogId() == 
Env.getCurrentInternalCatalog().getId();
-        boolean isSelectUnpartition = olapTable.getPartitionInfo().getType() 
== PartitionType.UNPARTITIONED
-                || olapScan.getSelectedPartitionIds().size() == 1;
+        boolean isBelongStableCG = Utils.isBelongStableCG(olapTable);
+        boolean isSelectUnpartition = Utils.isSelectUnpartition(olapTable, 
olapScan.getSelectedPartitionIds());
         // TODO: find a better way to handle both tablet num == 1 and colocate 
table together in future
         if (distributionInfo instanceof HashDistributionInfo && 
(isBelongStableCG || isSelectUnpartition)) {
             if (olapScan.getSelectedIndexId() != 
olapScan.getTable().getBaseIndexId()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DistinctAggregateRewriter.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DistinctAggregateRewriter.java
index c2668751dae..832641add2d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DistinctAggregateRewriter.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DistinctAggregateRewriter.java
@@ -24,6 +24,7 @@ import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.nereids.rules.Rule;
 import org.apache.doris.nereids.rules.RuleType;
 import org.apache.doris.nereids.rules.rewrite.StatsDerive.DeriveContext;
+import org.apache.doris.nereids.stats.ExpressionEstimation;
 import org.apache.doris.nereids.trees.expressions.Alias;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.NamedExpression;
@@ -46,6 +47,7 @@ import org.apache.doris.nereids.util.AggregateUtils;
 import org.apache.doris.nereids.util.ExpressionUtils;
 import org.apache.doris.nereids.util.Utils;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.statistics.ColumnStatistic;
 import org.apache.doris.statistics.Statistics;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -77,7 +79,6 @@ import java.util.Set;
  */
 public class DistinctAggregateRewriter implements RewriteRuleFactory {
     public static final DistinctAggregateRewriter INSTANCE = new 
DistinctAggregateRewriter();
-    private static final double MULTI_DISTINCT_GBY_PER_INSTANCE_THRESHOLD = 
30.0;
     // TODO: add other functions
     private static final Set<Class<? extends AggregateFunction>> 
supportSplitOtherFunctions = ImmutableSet.of(
             Sum.class, Min.class, Max.class, Count.class, Sum0.class, 
AnyValue.class);
@@ -118,26 +119,27 @@ public class DistinctAggregateRewriter implements 
RewriteRuleFactory {
         Statistics aggStats = aggregate.getStats();
         Statistics aggChildStats = aggregate.child().getStats();
         Set<Expression> dstArgs = aggregate.getDistinctArguments();
+        if (isDistinctKeySatisfyDistribution(aggregate)) {
+            return false;
+        }
         // has unknown statistics, split to bottom and top agg
         if 
(AggregateUtils.hasUnknownStatistics(aggregate.getGroupByExpressions(), 
aggChildStats)
                 || AggregateUtils.hasUnknownStatistics(dstArgs, 
aggChildStats)) {
-            return !isDistinctKeySatisfyDistribution(aggregate);
+            return true;
         }
 
         double gbyNdv = aggStats.getRowCount();
-        int instanceNum = getParallelExecInstanceNum(ConnectContext.get());
-        if (instanceNum <= 0) {
-            instanceNum = 1;
-        }
-        return gbyNdv / instanceNum <= 
MULTI_DISTINCT_GBY_PER_INSTANCE_THRESHOLD;
-    }
-
-    private int getParallelExecInstanceNum(ConnectContext ctx) {
-        if (ctx == null) {
-            return 1;
+        Expression dstKey = dstArgs.iterator().next();
+        ColumnStatistic dstKeyStats = 
aggChildStats.findColumnStatistics(dstKey);
+        if (dstKeyStats == null) {
+            dstKeyStats = ExpressionEstimation.estimate(dstKey, aggChildStats);
         }
-        return ctx.getSessionVariable()
-                
.getParallelExecInstanceNum(ctx.getSessionVariable().resolveCloudClusterName(ctx));
+        double dstNdv = dstKeyStats.ndv;
+        double inputRows = aggChildStats.getRowCount();
+        // group by key ndv is low, distinct key ndv is high, multi_distinct 
is better
+        // otherwise split to bottom and top agg
+        return gbyNdv < inputRows * AggregateUtils.LOW_CARDINALITY_THRESHOLD
+                && dstNdv > inputRows * 
AggregateUtils.HIGH_CARDINALITY_THRESHOLD;
     }
 
     private boolean isDistinctKeySatisfyDistribution(LogicalAggregate<? 
extends Plan> aggregate) {
@@ -147,12 +149,9 @@ public class DistinctAggregateRewriter implements 
RewriteRuleFactory {
         }
         Set<String> distinctColumnNames = new HashSet<>();
         for (SlotReference slot : info.distinctSlots) {
-            if (!slot.getOriginalColumn().isPresent()) {
-                return false;
-            }
-            
distinctColumnNames.add(slot.getOriginalColumn().get().getName().toLowerCase());
+            distinctColumnNames.add(slot.getName().toLowerCase());
         }
-        DistributionInfo distributionInfo = 
info.table.getDefaultDistributionInfo();
+        DistributionInfo distributionInfo = info.distributionInfo;
         if (!(distributionInfo instanceof HashDistributionInfo)) {
             return false;
         }
@@ -168,6 +167,15 @@ public class DistinctAggregateRewriter implements 
RewriteRuleFactory {
         return true;
     }
 
+    /** This function get the DistinctDistributionInfo from aggregate,
+     * and can handle such scenarios: aggregate's child is filter or project, 
or both of them:
+     * agg(count(distinct a), group by b)
+     *   ->project()
+     *     ->filter()
+     *       ->scan(distributed by hash(a))
+     * @return Table DistributionInfo and the slots of the base table
+     *         referenced by the distinct column of the aggregate function.
+     */
     private DistinctDistributionInfo 
resolveDistinctDistributionInfo(LogicalAggregate<? extends Plan> aggregate) {
         Set<Expression> distinctArgs = aggregate.getDistinctArguments();
         if (distinctArgs.isEmpty()) {
@@ -209,25 +217,30 @@ public class DistinctAggregateRewriter implements 
RewriteRuleFactory {
         if (!(child instanceof LogicalOlapScan)) {
             return null;
         }
-        OlapTable olapTable = ((LogicalOlapScan) child).getTable();
+        LogicalOlapScan scan = (LogicalOlapScan) child;
+        OlapTable olapTable = scan.getTable();
         if (olapTable == null) {
             return null;
         }
+        if (!Utils.isSelectUnpartition(olapTable, 
scan.getSelectedPartitionIds())
+                && !Utils.isBelongStableCG(olapTable)) {
+            return null;
+        }
         for (SlotReference slot : distinctSlots) {
             if (!slot.getOriginalTable().isPresent()
                     || slot.getOriginalTable().get() != olapTable) {
                 return null;
             }
         }
-        return new DistinctDistributionInfo(olapTable, distinctSlots);
+        return new 
DistinctDistributionInfo(olapTable.getDefaultDistributionInfo(), distinctSlots);
     }
 
     private static class DistinctDistributionInfo {
-        private final OlapTable table;
+        private final DistributionInfo distributionInfo;
         private final Set<SlotReference> distinctSlots;
 
-        private DistinctDistributionInfo(OlapTable table, Set<SlotReference> 
distinctSlots) {
-            this.table = table;
+        private DistinctDistributionInfo(DistributionInfo distributionInfo, 
Set<SlotReference> distinctSlots) {
+            this.distributionInfo = distributionInfo;
             this.distinctSlots = distinctSlots;
         }
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java
index 34e91af8375..66eb03cbaa4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java
@@ -17,6 +17,10 @@
 
 package org.apache.doris.nereids.util;
 
+import org.apache.doris.catalog.ColocateTableIndex;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.PartitionType;
 import org.apache.doris.info.TableNameInfo;
 import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.glue.LogicalPlanAdapter;
@@ -176,6 +180,18 @@ public class Utils {
         return StringUtils.join(qualifierWithBackquote, ".");
     }
 
+    public static boolean isBelongStableCG(OlapTable olapTable) {
+        ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex();
+        return colocateTableIndex.isColocateTable(olapTable.getId())
+                && 
!colocateTableIndex.isGroupUnstable(colocateTableIndex.getGroup(olapTable.getId()))
+                && olapTable.getCatalogId() == 
Env.getCurrentInternalCatalog().getId();
+    }
+
+    public static boolean isSelectUnpartition(OlapTable olapTable, 
Collection<Long> selectedPartitionIds) {
+        return olapTable.getPartitionInfo().getType() == 
PartitionType.UNPARTITIONED
+                || selectedPartitionIds.size() == 1;
+    }
+
     /**
      * Get sql string for plan.
      *
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/DistinctAggregateRewriterTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/DistinctAggregateRewriterTest.java
index 05c2180840c..7b9e957b742 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/DistinctAggregateRewriterTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/DistinctAggregateRewriterTest.java
@@ -17,7 +17,10 @@
 
 package org.apache.doris.nereids.rules.rewrite;
 
+import org.apache.doris.catalog.DistributionInfo;
+import org.apache.doris.catalog.HashDistributionInfo;
 import 
org.apache.doris.nereids.rules.analysis.LogicalSubQueryAliasToLogicalProject;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
 import 
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
 import org.apache.doris.nereids.trees.expressions.functions.agg.Count;
 import 
org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinctCount;
@@ -35,15 +38,22 @@ import org.apache.doris.statistics.ColumnStatisticBuilder;
 import org.apache.doris.statistics.Statistics;
 import org.apache.doris.utframe.TestWithFeService;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import mockit.Mock;
 import mockit.MockUp;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 public class DistinctAggregateRewriterTest extends TestWithFeService 
implements MemoPatternMatchSupported {
     @Override
@@ -52,16 +62,33 @@ public class DistinctAggregateRewriterTest extends 
TestWithFeService implements
         createTable("create table test.distinct_agg_split_t(a int null, b int 
not null,"
                 + "c varchar(10) null, d date, dt datetime)\n"
                 + "distributed by hash(a) properties('replication_num' = 
'1');");
+        createTable("CREATE TABLE IF NOT EXISTS test.sales_records\n"
+                + "(\n"
+                + "    record_id BIGINT,\n"
+                + "    seller_id BIGINT,\n"
+                + "    sale_date DATE,\n"
+                + "    amount DECIMAL(18,2)\n"
+                + ")\n"
+                + "DUPLICATE KEY(record_id, seller_id)\n"
+                + "PARTITION BY RANGE(sale_date)\n"
+                + "(\n"
+                + "    PARTITION p202301 VALUES LESS THAN ('2023-02-01'),\n"
+                + "    PARTITION p202302 VALUES LESS THAN ('2023-03-01'),\n"
+                + "    PARTITION p202303 VALUES LESS THAN ('2023-04-01')\n"
+                + ")\n"
+                + "DISTRIBUTED BY HASH(record_id) BUCKETS 10\n"
+                + "PROPERTIES (\n"
+                + "    \"replication_num\" = \"1\"\n"
+                + ");");
+        createTable("create table test.distinct_agg_hash_ab(a int null, b int 
not null, c int null, d int null)\n"
+                + "distributed by hash(a, b) properties('replication_num' = 
'1');");
+        createTable("create table test.distinct_agg_hash_abcd(a int null, b 
int not null, c int null, d int null)\n"
+                + "distributed by hash(a, b, c, d) 
properties('replication_num' = '1');");
         connectContext.setDatabase("test");
+        SessionVariable spySessionVariable = 
Mockito.spy(connectContext.getSessionVariable());
+        
Mockito.doReturn(24).when(spySessionVariable).getParallelExecInstanceNum(Mockito.anyString());
+        connectContext.setSessionVariable(spySessionVariable);
         
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
-        new SessionVariableMockUp();
-    }
-
-    private static class SessionVariableMockUp extends MockUp<SessionVariable> 
{
-        @Mock
-        private int getParallelExecInstanceNum(String clusterName) {
-            return 24;
-        }
     }
 
     private void applyMock() {
@@ -256,25 +283,67 @@ public class DistinctAggregateRewriterTest extends 
TestWithFeService implements
         ((AbstractPlan) child).setStatistics(new Statistics(100000, colStats));
         aggregate.setStatistics(new Statistics(240, ImmutableMap.of()));
 
-        Assertions.assertTrue(rewriter.shouldUseMultiDistinct(aggregate));
+        Assertions.assertFalse(rewriter.shouldUseMultiDistinct(aggregate));
     }
 
     @Test
-    void testShouldUseMultiDistinctWithStatsNotSelected() throws Exception {
-        DistinctAggregateRewriter rewriter = new DistinctAggregateRewriter();
+    void testShouldUseMultiDistinctWithPartitionTable() {
+        DistinctAggregateRewriter rewriter = 
DistinctAggregateRewriter.INSTANCE;
         LogicalAggregate<? extends Plan> aggregate = getLogicalAggregate(
-                "select b, count(distinct a) from test.distinct_agg_split_t 
group by b"
+                "select count(distinct record_id) from sales_records group by 
sale_date;"
         );
         Plan child = aggregate.child();
         Map<org.apache.doris.nereids.trees.expressions.Expression, 
ColumnStatistic> colStats = new HashMap<>();
         aggregate.getGroupByExpressions().forEach(expr ->
-                colStats.put(expr, buildColumnStats(1000.0, false)));
+                colStats.put(expr, unknownColumnStats()));
         aggregate.getDistinctArguments().forEach(expr ->
-                colStats.put(expr, buildColumnStats(10000.0, false)));
-        ((AbstractPlan) child).setStatistics(new Statistics(100000, colStats));
-        aggregate.setStatistics(new Statistics(1000.0, ImmutableMap.of()));
+                colStats.put(expr, unknownColumnStats()));
+        ((AbstractPlan) child).setStatistics(new Statistics(10000, colStats));
+        aggregate.setStatistics(new Statistics(100, ImmutableMap.of()));
 
-        Assertions.assertFalse(rewriter.shouldUseMultiDistinct(aggregate));
+        Assertions.assertTrue(rewriter.shouldUseMultiDistinct(aggregate));
+    }
+
+    @Test
+    void testResolveDistinctDistributionInfoWithProjectAndFilter() throws 
Exception {
+        LogicalAggregate<? extends Plan> aggregate = getLogicalAggregate(
+                "select bb, count(distinct aa) from "
+                        + "(select a as aa, b as bb from 
test.distinct_agg_hash_ab where c > 1) t "
+                        + "group by bb"
+        );
+
+        Object info = invokeResolveDistinctDistributionInfo(aggregate);
+        Assertions.assertNotNull(info);
+
+        List<String> distinctSlotNames = getDistinctSlots(info).stream()
+                .map(SlotReference::getName)
+                .collect(Collectors.toList());
+        Assertions.assertEquals(ImmutableList.of("a"), distinctSlotNames);
+
+        DistributionInfo distributionInfo = getDistributionInfo(info);
+        Assertions.assertTrue(distributionInfo instanceof 
HashDistributionInfo);
+        List<String> distributionColumnNames = ((HashDistributionInfo) 
distributionInfo).getDistributionColumns().stream()
+                .map(column -> column.getName().toLowerCase())
+                .collect(Collectors.toList());
+        Assertions.assertEquals(ImmutableList.of("a", "b"), 
distributionColumnNames);
+    }
+
+    @Test
+    void 
testIsDistinctKeySatisfyDistributionWhenDistinctContainsDistributionColumns() 
throws Exception {
+        LogicalAggregate<? extends Plan> aggregate = getLogicalAggregate(
+                "select d, count(distinct a, b, c) from 
test.distinct_agg_hash_ab group by d"
+        );
+
+        
Assertions.assertTrue(invokeIsDistinctKeySatisfyDistribution(aggregate));
+    }
+
+    @Test
+    void testIsDistinctKeySatisfyDistributionWhenDistributionHasExtraColumns() 
throws Exception {
+        LogicalAggregate<? extends Plan> aggregate = getLogicalAggregate(
+                "select d, count(distinct a, b, c) from 
test.distinct_agg_hash_abcd group by d"
+        );
+
+        
Assertions.assertFalse(invokeIsDistinctKeySatisfyDistribution(aggregate));
     }
 
     private LogicalAggregate<? extends Plan> getLogicalAggregate(String sql) {
@@ -300,6 +369,33 @@ public class DistinctAggregateRewriterTest extends 
TestWithFeService implements
         return Optional.empty();
     }
 
+    private Object invokeResolveDistinctDistributionInfo(LogicalAggregate<? 
extends Plan> aggregate) throws Exception {
+        Method method = DistinctAggregateRewriter.class.getDeclaredMethod(
+                "resolveDistinctDistributionInfo", LogicalAggregate.class);
+        method.setAccessible(true);
+        return method.invoke(DistinctAggregateRewriter.INSTANCE, aggregate);
+    }
+
+    private boolean invokeIsDistinctKeySatisfyDistribution(LogicalAggregate<? 
extends Plan> aggregate) throws Exception {
+        Method method = DistinctAggregateRewriter.class.getDeclaredMethod(
+                "isDistinctKeySatisfyDistribution", LogicalAggregate.class);
+        method.setAccessible(true);
+        return (boolean) method.invoke(DistinctAggregateRewriter.INSTANCE, 
aggregate);
+    }
+
+    @SuppressWarnings("unchecked")
+    private Set<SlotReference> getDistinctSlots(Object info) throws Exception {
+        Field field = info.getClass().getDeclaredField("distinctSlots");
+        field.setAccessible(true);
+        return (Set<SlotReference>) field.get(info);
+    }
+
+    private DistributionInfo getDistributionInfo(Object info) throws Exception 
{
+        Field field = info.getClass().getDeclaredField("distributionInfo");
+        field.setAccessible(true);
+        return (DistributionInfo) field.get(info);
+    }
+
     private ColumnStatistic unknownColumnStats() {
         return buildColumnStats(0.0, true);
     }
diff --git 
a/regression-test/data/nereids_rules_p0/agg_strategy/agg_strategy.out 
b/regression-test/data/nereids_rules_p0/agg_strategy/agg_strategy.out
index a718649a385..3deb2ff71d4 100644
--- a/regression-test/data/nereids_rules_p0/agg_strategy/agg_strategy.out
+++ b/regression-test/data/nereids_rules_p0/agg_strategy/agg_strategy.out
@@ -60,40 +60,40 @@
 1
 
 -- !agg_distinct_with_gby_key_with_other_func --
-1      1       27      27
-1      2       76      76
-1      3       42      42
-1      4       64      64
-1      5       18      18
-1      6       91      91
-1      7       13      13
-1      8       33      33
-1      9       55      55
-1      10      100     100
+1      1       27      27.0
+1      2       76      76.0
+1      3       42      42.0
+1      4       64      64.0
+1      5       18      18.0
+1      6       91      91.0
+1      7       13      13.0
+1      8       33      33.0
+1      9       55      55.0
+1      10      100     100.0
 
 -- !agg_distinct_satisfy_gby_key_with_other_func --
-1      1       42      42
-1      2       18      18
-1      3       76      76
-1      4       33      33
-1      5       91      91
-1      6       27      27
-1      7       64      64
-1      8       55      55
-1      9       13      13
-1      10      100     100
+1      1       42      42.0
+1      2       18      18.0
+1      3       76      76.0
+1      4       33      33.0
+1      5       91      91.0
+1      6       27      27.0
+1      7       64      64.0
+1      8       55      55.0
+1      9       13      13.0
+1      10      100     100.0
 
 -- !agg_distinct_satisfy_dst_key_with_other_func --
-1      13      13
-1      18      18
-1      27      27
-1      33      33
-1      42      42
-1      55      55
-1      64      64
-1      76      76
-1      91      91
-1      100     100
+1      13      13.0
+1      18      18.0
+1      27      27.0
+1      33      33.0
+1      42      42.0
+1      55      55.0
+1      64      64.0
+1      76      76.0
+1      91      91.0
+1      100     100.0
 
 -- !agg_distinct_without_gby_key --
 10
@@ -281,40 +281,40 @@ PhysicalResultSink
 1
 
 -- !agg_distinct_with_gby_key_with_other_func --
-1      1       27      27
-1      2       76      76
-1      3       42      42
-1      4       64      64
-1      5       18      18
-1      6       91      91
-1      7       13      13
-1      8       33      33
-1      9       55      55
-1      10      100     100
+1      1       27      27.0
+1      2       76      76.0
+1      3       42      42.0
+1      4       64      64.0
+1      5       18      18.0
+1      6       91      91.0
+1      7       13      13.0
+1      8       33      33.0
+1      9       55      55.0
+1      10      100     100.0
 
 -- !agg_distinct_satisfy_gby_key_with_other_func --
-1      1       42      42
-1      2       18      18
-1      3       76      76
-1      4       33      33
-1      5       91      91
-1      6       27      27
-1      7       64      64
-1      8       55      55
-1      9       13      13
-1      10      100     100
+1      1       42      42.0
+1      2       18      18.0
+1      3       76      76.0
+1      4       33      33.0
+1      5       91      91.0
+1      6       27      27.0
+1      7       64      64.0
+1      8       55      55.0
+1      9       13      13.0
+1      10      100     100.0
 
 -- !agg_distinct_satisfy_dst_key_with_other_func --
-1      13      13
-1      18      18
-1      27      27
-1      33      33
-1      42      42
-1      55      55
-1      64      64
-1      76      76
-1      91      91
-1      100     100
+1      13      13.0
+1      18      18.0
+1      27      27.0
+1      33      33.0
+1      42      42.0
+1      55      55.0
+1      64      64.0
+1      76      76.0
+1      91      91.0
+1      100     100.0
 
 -- !agg_distinct_without_gby_key --
 10
@@ -356,7 +356,10 @@ PhysicalResultSink
 --------hashAgg[GLOBAL]
 ----------PhysicalDistribute[DistributionSpecHash]
 ------------hashAgg[LOCAL]
---------------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id]
+--------------hashAgg[GLOBAL]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashAgg[LOCAL]
+--------------------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id]
 
 -- !agg_distinct_satisfy_gby_key --
 PhysicalResultSink
@@ -364,7 +367,8 @@ PhysicalResultSink
 ----PhysicalDistribute[DistributionSpecGather]
 ------PhysicalQuickSort[LOCAL_SORT]
 --------hashAgg[GLOBAL]
-----------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id]
+----------hashAgg[GLOBAL]
+------------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id]
 
 -- !agg_distinct_satisfy_dst_key --
 PhysicalResultSink
@@ -374,35 +378,41 @@ PhysicalResultSink
 --------hashAgg[GLOBAL]
 ----------PhysicalDistribute[DistributionSpecHash]
 ------------hashAgg[LOCAL]
---------------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id]
+--------------hashAgg[GLOBAL]
+----------------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id]
 
 -- !agg_distinct_with_gby_key_with_other_func --
 PhysicalResultSink
 --PhysicalQuickSort[MERGE_SORT]
 ----PhysicalDistribute[DistributionSpecGather]
 ------PhysicalQuickSort[LOCAL_SORT]
---------hashAgg[GLOBAL]
+--------hashAgg[DISTINCT_GLOBAL]
 ----------PhysicalDistribute[DistributionSpecHash]
-------------hashAgg[LOCAL]
---------------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id]
+------------hashAgg[DISTINCT_LOCAL]
+--------------hashAgg[GLOBAL]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashAgg[LOCAL]
+--------------------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id]
 
 -- !agg_distinct_satisfy_gby_key_with_other_func --
 PhysicalResultSink
 --PhysicalQuickSort[MERGE_SORT]
 ----PhysicalDistribute[DistributionSpecGather]
 ------PhysicalQuickSort[LOCAL_SORT]
---------hashAgg[GLOBAL]
-----------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id]
+--------hashAgg[DISTINCT_GLOBAL]
+----------hashAgg[GLOBAL]
+------------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id]
 
 -- !agg_distinct_satisfy_dst_key_with_other_func --
 PhysicalResultSink
 --PhysicalQuickSort[MERGE_SORT]
 ----PhysicalDistribute[DistributionSpecGather]
 ------PhysicalQuickSort[LOCAL_SORT]
---------hashAgg[GLOBAL]
+--------hashAgg[DISTINCT_GLOBAL]
 ----------PhysicalDistribute[DistributionSpecHash]
-------------hashAgg[LOCAL]
---------------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id]
+------------hashAgg[DISTINCT_LOCAL]
+--------------hashAgg[GLOBAL]
+----------------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id]
 
 -- !agg_distinct_without_gby_key --
 PhysicalResultSink
@@ -484,16 +494,16 @@ PhysicalResultSink
 2
 
 -- !agg_distinct_with_gby_key_with_other_func_low_ndv --
-2      0       60      4
-2      1       75      5
+2      0       60      4.0
+2      1       75      5.0
 
 -- !agg_distinct_satisfy_gby_key_with_other_func_low_ndv --
 2      0       69      4.3125
 2      1       66      4.714285714285714
 
 -- !agg_distinct_satisfy_dst_key_with_other_func_low_ndv --
-2      60      4
-2      75      5
+2      60      4.0
+2      75      5.0
 
 -- !agg_distinct_without_gby_key_low_ndv --
 2
@@ -580,9 +590,9 @@ PhysicalResultSink
 ----PhysicalDistribute[DistributionSpecGather]
 ------PhysicalQuickSort[LOCAL_SORT]
 --------hashAgg[DISTINCT_GLOBAL]
-----------PhysicalDistribute[DistributionSpecHash]
-------------hashAgg[DISTINCT_LOCAL]
---------------hashAgg[GLOBAL]
+----------hashAgg[GLOBAL]
+------------PhysicalDistribute[DistributionSpecHash]
+--------------hashAgg[LOCAL]
 ----------------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id]
 
 -- !agg_distinct_without_gby_key_low_ndv --
@@ -641,16 +651,16 @@ PhysicalResultSink
 2
 
 -- !agg_distinct_with_gby_key_with_other_func_low_ndv --
-2      0       60      4
-2      1       75      5
+2      0       60      4.0
+2      1       75      5.0
 
 -- !agg_distinct_satisfy_gby_key_with_other_func_low_ndv --
 2      0       69      4.3125
 2      1       66      4.714285714285714
 
 -- !agg_distinct_satisfy_dst_key_with_other_func_low_ndv --
-2      60      4
-2      75      5
+2      60      4.0
+2      75      5.0
 
 -- !agg_distinct_without_gby_key_low_ndv --
 2
@@ -692,7 +702,10 @@ PhysicalResultSink
 --------hashAgg[GLOBAL]
 ----------PhysicalDistribute[DistributionSpecHash]
 ------------hashAgg[LOCAL]
---------------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id]
+--------------hashAgg[GLOBAL]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashAgg[LOCAL]
+--------------------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id]
 
 -- !agg_distinct_satisfy_gby_key_low_ndv --
 PhysicalResultSink
@@ -700,7 +713,8 @@ PhysicalResultSink
 ----PhysicalDistribute[DistributionSpecGather]
 ------PhysicalQuickSort[LOCAL_SORT]
 --------hashAgg[GLOBAL]
-----------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id]
+----------hashAgg[GLOBAL]
+------------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id]
 
 -- !agg_distinct_satisfy_dst_key_low_ndv --
 PhysicalResultSink
@@ -710,35 +724,41 @@ PhysicalResultSink
 --------hashAgg[GLOBAL]
 ----------PhysicalDistribute[DistributionSpecHash]
 ------------hashAgg[LOCAL]
---------------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id]
+--------------hashAgg[GLOBAL]
+----------------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id]
 
 -- !agg_distinct_with_gby_key_with_other_func_low_ndv --
 PhysicalResultSink
 --PhysicalQuickSort[MERGE_SORT]
 ----PhysicalDistribute[DistributionSpecGather]
 ------PhysicalQuickSort[LOCAL_SORT]
---------hashAgg[GLOBAL]
+--------hashAgg[DISTINCT_GLOBAL]
 ----------PhysicalDistribute[DistributionSpecHash]
-------------hashAgg[LOCAL]
---------------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id]
+------------hashAgg[DISTINCT_LOCAL]
+--------------hashAgg[GLOBAL]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashAgg[LOCAL]
+--------------------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id]
 
 -- !agg_distinct_satisfy_gby_key_with_other_func_low_ndv --
 PhysicalResultSink
 --PhysicalQuickSort[MERGE_SORT]
 ----PhysicalDistribute[DistributionSpecGather]
 ------PhysicalQuickSort[LOCAL_SORT]
---------hashAgg[GLOBAL]
-----------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id]
+--------hashAgg[DISTINCT_GLOBAL]
+----------hashAgg[GLOBAL]
+------------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id]
 
 -- !agg_distinct_satisfy_dst_key_with_other_func_low_ndv --
 PhysicalResultSink
 --PhysicalQuickSort[MERGE_SORT]
 ----PhysicalDistribute[DistributionSpecGather]
 ------PhysicalQuickSort[LOCAL_SORT]
---------hashAgg[GLOBAL]
+--------hashAgg[DISTINCT_GLOBAL]
 ----------PhysicalDistribute[DistributionSpecHash]
-------------hashAgg[LOCAL]
---------------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id]
+------------hashAgg[DISTINCT_LOCAL]
+--------------hashAgg[GLOBAL]
+----------------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id]
 
 -- !agg_distinct_without_gby_key_low_ndv --
 PhysicalResultSink
@@ -788,16 +808,16 @@ PhysicalResultSink
 1
 
 -- !with_gby_split_in_cascades --
-1      13
-1      18
-1      27
-1      33
-1      42
-1      55
-1      64
-1      76
-1      91
-1      100
+1      13.0
+1      18.0
+1      27.0
+1      33.0
+1      42.0
+1      55.0
+1      64.0
+1      76.0
+1      91.0
+1      100.0
 
 -- !without_gby --
 10
diff --git 
a/regression-test/data/nereids_rules_p0/agg_strategy/distinct_agg_rewriter.out 
b/regression-test/data/nereids_rules_p0/agg_strategy/distinct_agg_rewriter.out
index b386b2f510a..18088719ce3 100644
--- 
a/regression-test/data/nereids_rules_p0/agg_strategy/distinct_agg_rewriter.out
+++ 
b/regression-test/data/nereids_rules_p0/agg_strategy/distinct_agg_rewriter.out
@@ -14,9 +14,10 @@ PhysicalResultSink
 PhysicalResultSink
 --PhysicalDistribute[DistributionSpecGather]
 ----hashAgg[GLOBAL]
-------PhysicalDistribute[DistributionSpecHash]
---------hashAgg[LOCAL]
-----------PhysicalOlapScan[t1000_2]
+------hashAgg[GLOBAL]
+--------PhysicalDistribute[DistributionSpecHash]
+----------hashAgg[LOCAL]
+------------PhysicalOlapScan[t1000_2]
 
 -- !use_multi_phase3 --
 PhysicalResultSink
@@ -24,7 +25,10 @@ PhysicalResultSink
 ----hashAgg[GLOBAL]
 ------PhysicalDistribute[DistributionSpecHash]
 --------hashAgg[LOCAL]
-----------PhysicalOlapScan[t1000_2]
+----------hashAgg[GLOBAL]
+------------PhysicalDistribute[DistributionSpecHash]
+--------------hashAgg[LOCAL]
+----------------PhysicalOlapScan[t1000_2]
 
 -- !use_multi_distinct --
 PhysicalResultSink
diff --git 
a/regression-test/data/nereids_rules_p0/agg_strategy/distinct_agg_strategy_selector.out
 
b/regression-test/data/nereids_rules_p0/agg_strategy/distinct_agg_strategy_selector.out
index 7c37f7f73b2..05ef347e89b 100644
--- 
a/regression-test/data/nereids_rules_p0/agg_strategy/distinct_agg_strategy_selector.out
+++ 
b/regression-test/data/nereids_rules_p0/agg_strategy/distinct_agg_strategy_selector.out
@@ -57,13 +57,19 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
 --------hashAgg[GLOBAL]
 ----------PhysicalDistribute[DistributionSpecHash]
 ------------hashAgg[LOCAL]
---------------PhysicalDistribute[DistributionSpecExecutionAny]
-----------------PhysicalCteConsumer ( cteId=CTEId#0 )
+--------------hashAgg[GLOBAL]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashAgg[LOCAL]
+--------------------PhysicalDistribute[DistributionSpecExecutionAny]
+----------------------PhysicalCteConsumer ( cteId=CTEId#0 )
 --------hashAgg[GLOBAL]
 ----------PhysicalDistribute[DistributionSpecHash]
 ------------hashAgg[LOCAL]
---------------PhysicalDistribute[DistributionSpecExecutionAny]
-----------------PhysicalCteConsumer ( cteId=CTEId#0 )
+--------------hashAgg[GLOBAL]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashAgg[LOCAL]
+--------------------PhysicalDistribute[DistributionSpecExecutionAny]
+----------------------PhysicalCteConsumer ( cteId=CTEId#0 )
 
 -- !should_use_multi_distinct_with_group_by --
 PhysicalResultSink
diff --git 
a/regression-test/data/nereids_rules_p0/agg_strategy/physical_agg_regulator.out 
b/regression-test/data/nereids_rules_p0/agg_strategy/physical_agg_regulator.out
index 8a6d35b0094..a33a83f01e0 100644
--- 
a/regression-test/data/nereids_rules_p0/agg_strategy/physical_agg_regulator.out
+++ 
b/regression-test/data/nereids_rules_p0/agg_strategy/physical_agg_regulator.out
@@ -50,7 +50,10 @@ PhysicalResultSink
 ----hashAgg[GLOBAL]
 ------PhysicalDistribute[DistributionSpecHash]
 --------hashAgg[LOCAL]
-----------PhysicalOlapScan[t1025]
+----------hashAgg[GLOBAL]
+------------PhysicalDistribute[DistributionSpecHash]
+--------------hashAgg[LOCAL]
+----------------PhysicalOlapScan[t1025]
 
 -- !split_multi_agg_use_three_phase --
 PhysicalResultSink
@@ -64,8 +67,11 @@ PhysicalResultSink
 -- !split_multi_agg_use_four_phase --
 PhysicalResultSink
 --PhysicalDistribute[DistributionSpecGather]
-----hashAgg[GLOBAL]
+----hashAgg[DISTINCT_GLOBAL]
 ------PhysicalDistribute[DistributionSpecHash]
---------hashAgg[LOCAL]
-----------PhysicalOlapScan[t1025]
+--------hashAgg[DISTINCT_LOCAL]
+----------hashAgg[GLOBAL]
+------------PhysicalDistribute[DistributionSpecHash]
+--------------hashAgg[LOCAL]
+----------------PhysicalOlapScan[t1025]
 
diff --git 
a/regression-test/data/nereids_rules_p0/distinct_split/disitinct_split.out 
b/regression-test/data/nereids_rules_p0/distinct_split/disitinct_split.out
index 1975968f4de..735f924821a 100644
--- a/regression-test/data/nereids_rules_p0/distinct_split/disitinct_split.out
+++ b/regression-test/data/nereids_rules_p0/distinct_split/disitinct_split.out
@@ -200,18 +200,18 @@
 2      2
 
 -- !00_avg --
-2
+2.0
 
 -- !10_avg --
-2      1.5
+2.0    1.5
 
 -- !01_avg --
-2
-2
+2.0
+2.0
 
 -- !11_avg --
-2      1
-2      2
+2.0    1.0
+2.0    2.0
 
 -- !count_sum_avg_no_gby --
 2      2       3.5
@@ -221,10 +221,10 @@
 
 -- !count_sum_avg_with_gby --
 2      1       3.5
-2      1       4
+2      1       4.0
 
 -- !count_multi_sum_avg_with_gby --
-2      2       4
+2      2       4.0
 2      3       3.5
 
 -- !multi_sum_has_upper --
@@ -494,10 +494,14 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
 ----hashJoin[INNER_JOIN] hashCondition=((c <=> .c)) otherCondition=()
 ------hashAgg[GLOBAL]
 --------hashAgg[LOCAL]
-----------PhysicalCteConsumer ( cteId=CTEId#0 )
+----------hashAgg[GLOBAL]
+------------hashAgg[LOCAL]
+--------------PhysicalCteConsumer ( cteId=CTEId#0 )
 ------hashAgg[GLOBAL]
 --------hashAgg[LOCAL]
-----------PhysicalCteConsumer ( cteId=CTEId#0 )
+----------hashAgg[GLOBAL]
+------------hashAgg[LOCAL]
+--------------PhysicalCteConsumer ( cteId=CTEId#0 )
 
 -- !multi_sum_with_gby --
 PhysicalCteAnchor ( cteId=CTEId#0 )
@@ -507,10 +511,14 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
 ----hashJoin[INNER_JOIN] hashCondition=((c <=> .c)) otherCondition=()
 ------hashAgg[GLOBAL]
 --------hashAgg[LOCAL]
-----------PhysicalCteConsumer ( cteId=CTEId#0 )
+----------hashAgg[GLOBAL]
+------------hashAgg[LOCAL]
+--------------PhysicalCteConsumer ( cteId=CTEId#0 )
 ------hashAgg[GLOBAL]
 --------hashAgg[LOCAL]
-----------PhysicalCteConsumer ( cteId=CTEId#0 )
+----------hashAgg[GLOBAL]
+------------hashAgg[LOCAL]
+--------------PhysicalCteConsumer ( cteId=CTEId#0 )
 
 -- !sum_count_with_gby --
 PhysicalCteAnchor ( cteId=CTEId#0 )
@@ -520,10 +528,13 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
 ----hashJoin[INNER_JOIN] hashCondition=((a <=> .a)) otherCondition=()
 ------hashAgg[GLOBAL]
 --------hashAgg[LOCAL]
-----------PhysicalCteConsumer ( cteId=CTEId#0 )
+----------hashAgg[GLOBAL]
+------------hashAgg[LOCAL]
+--------------PhysicalCteConsumer ( cteId=CTEId#0 )
 ------hashAgg[GLOBAL]
---------hashAgg[LOCAL]
-----------PhysicalCteConsumer ( cteId=CTEId#0 )
+--------hashAgg[GLOBAL]
+----------hashAgg[LOCAL]
+------------PhysicalCteConsumer ( cteId=CTEId#0 )
 
 -- !has_grouping --
 PhysicalResultSink
@@ -533,7 +544,7 @@ PhysicalResultSink
 --------PhysicalOlapScan[test_distinct_multi]
 
 -- !null_hash --
-1      \N      0       0
+1      \N      0       0.0
 
 -- !same_distinct_arg --
 2      1
diff --git 
a/regression-test/suites/nereids_rules_p0/agg_strategy/agg_strategy.groovy 
b/regression-test/suites/nereids_rules_p0/agg_strategy/agg_strategy.groovy
index a5c136d074c..72ee1b92efb 100644
--- a/regression-test/suites/nereids_rules_p0/agg_strategy/agg_strategy.groovy
+++ b/regression-test/suites/nereids_rules_p0/agg_strategy/agg_strategy.groovy
@@ -21,7 +21,6 @@ suite("agg_strategy") {
     sql "set global enable_auto_analyze=false"
     sql "set runtime_filter_mode=OFF"
     sql "set be_number_for_test=1;"
-    sql "set parallel_pipeline_task_num=1;"
 
     for (int i = 0; i < 2; i++) {
         if (i == 0) {
diff --git 
a/regression-test/suites/nereids_rules_p0/agg_strategy/distinct_agg_rewriter.groovy
 
b/regression-test/suites/nereids_rules_p0/agg_strategy/distinct_agg_rewriter.groovy
index 476fa559ba6..953c233c0d5 100644
--- 
a/regression-test/suites/nereids_rules_p0/agg_strategy/distinct_agg_rewriter.groovy
+++ 
b/regression-test/suites/nereids_rules_p0/agg_strategy/distinct_agg_rewriter.groovy
@@ -21,7 +21,6 @@ suite("distinct_agg_rewriter") {
     set runtime_filter_mode=OFF;
     set enable_parallel_result_sink=false;
     set be_number_for_test=1;
-    set parallel_pipeline_task_num=1;
     """
     multi_sql """
     analyze table t1000_2 with sync;
diff --git 
a/regression-test/suites/nereids_rules_p0/distinct_split/disitinct_split.groovy 
b/regression-test/suites/nereids_rules_p0/distinct_split/disitinct_split.groovy
index f9da6636064..883f0529132 100644
--- 
a/regression-test/suites/nereids_rules_p0/distinct_split/disitinct_split.groovy
+++ 
b/regression-test/suites/nereids_rules_p0/distinct_split/disitinct_split.groovy
@@ -20,6 +20,7 @@ suite("distinct_split") {
     sql "set disable_join_reorder=true"
     sql "set global enable_auto_analyze=false;"
     sql "set be_number_for_test=1;"
+    sql "set parallel_pipeline_task_num=1;"
     sql "drop table if exists test_distinct_multi"
     sql "create table test_distinct_multi(a int, b int, c int, d varchar(10), 
e date) distributed by hash(a) properties('replication_num'='1');"
     sql "insert into test_distinct_multi 
values(1,2,3,'abc','2024-01-02'),(1,2,4,'abc','2024-01-03'),(2,2,4,'abcd','2024-01-02'),(1,2,3,'abcd','2024-01-04'),(1,2,4,'eee','2024-02-02'),(2,2,4,'abc','2024-01-02');"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to