This is an automated email from the ASF dual-hosted git repository.

924060929 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e5bbebb4905 [opt](local shuffle) bucket-shuffle dest spreading + 
bucket-to-hash parallelism upgrade (#64793)
e5bbebb4905 is described below

commit e5bbebb49058a998f8e3a5a380d49508c1bc4a2c
Author: 924060929 <[email protected]>
AuthorDate: Tue Jun 30 10:33:27 2026 +0800

    [opt](local shuffle) bucket-shuffle dest spreading + bucket-to-hash 
parallelism upgrade (#64793)
    
    Related PR: #63366 (FE local-shuffle planner, merged)
    
    Problem Summary:
    
    Two optimizations on top of the FE local-shuffle planner (#63366),
    addressing bucket-shuffle serial bottleneck + parallelism upgrade.
    
    ---
    
    #### Part 1: ExchangeNode serial decoupling + bucket-shuffle dest
    spreading
    
    When the FE planner is enabled, `ExchangeNode.isSerialOperatorOnBe` no
    longer inherits `fragment.hasSerialScanNode()` — serial status comes
    from the node itself only. This allows `DistributePlanner` to spread
    bucket-shuffle destinations to all bucket-owning instances (via
    `assignedJoinBucketIndexes`) instead of funneling to the first instance
    per worker.
    
    Includes BE-side orphan instance fix: non-owning instances get receivers
    with `num_senders=0` (ready immediately at EOS), and orphan detection
    uses per-BE local task index.
    
    ```sql
    -- orders: DISTRIBUTED BY HASH(o_custkey) BUCKETS 4
    -- customer: DISTRIBUTED BY HASH(c_custkey) BUCKETS 4
    SELECT c_name, SUM(o_totalprice) total
    FROM orders JOIN customer ON o_custkey = c_custkey
    GROUP BY c_name ORDER BY total DESC;
    ```
    
    Baseline (planner=false, BE native) — no LocalExchangeNode:
    
    ```
      4:AGG(update serialize, STREAMING)
      3:HASH JOIN (BUCKET_SHUFFLE)
      |----1:EXCHANGE                     ← customer, bucket-shuffle partitioned
      2:OlapScan(orders)                  ← pooling scan, 16 instances but only 
4 have buckets
    ```
    
    FE planner with dest spreading (ratio=0, no upgrade) — bucket
    distribution preserved, dests spread to all 4 bucket-owning instances:
    
    ```
      4:AGG(update serialize, STREAMING)
      12:LOCAL-EXCHANGE(PASSTHROUGH)       ← fan serial→parallel for agg
      3:HASH JOIN (BUCKET_SHUFFLE)
      |----1:EXCHANGE                      ← customer, dests spread to 4 
bucket-owning instances
      11:LOCAL-EXCHANGE(BUCKET_HASH)       ← ★ bucket LE: join capped at bucket 
count (4)
      10:LOCAL-EXCHANGE(PASSTHROUGH)       ← fan serial scan → parallel
      2:OlapScan(orders, POOLING-SCAN)
    ```
    
    ---
    
    #### Part 2: Bucket-to-hash parallelism upgrade + RF force_local_merge
    
    When per-BE instances significantly exceed bucket count, upgrade
    `BUCKET_HASH_SHUFFLE` local exchanges to `LOCAL_EXECUTION_HASH_SHUFFLE`
    so the join runs at full instance parallelism. Gate: `min(instances,
    executor_threads) / min(buckets, executor_threads) > ratio` (session var
    `local_shuffle_bucket_upgrade_ratio`, default 1.5; <= 1 disables). With
    the default `bucket_shuffle_downgrade_ratio` (0.8) a single bucket join
    only forms when instances/buckets <= 1.25, so this upgrade stays off for
    single joins by default and engages only for stacked / wide
    bucket-domain shapes.
    
    RF `force_local_merge` fix: bucket upgrade flips scan from serial to
    parallel, breaking the implicit RF merge signal. Added
    `TRuntimeFilterDesc.force_local_merge` — FE walks builder→target path
    after `AddLocalExchange`; if a `LocalExchangeNode` sits on the path, the
    target must merge partial RFs.
    
    **Single bucket join** — same SQL as above, ratio=1.1:
    
    ```
      4:AGG(update serialize, STREAMING)
      13:LOCAL-EXCHANGE(PASSTHROUGH)
      3:HASH JOIN (BUCKET_SHUFFLE)
      |----12:LOCAL-EXCHANGE(LOCAL_EXECUTION_HASH)  ← ★ UPGRADED: hash by join 
key, all 16 instances
      |    1:EXCHANGE
      11:LOCAL-EXCHANGE(LOCAL_EXECUTION_HASH)        ← ★ UPGRADED: was 
BUCKET_HASH, now LOCAL hash
      10:LOCAL-EXCHANGE(PASSTHROUGH)
      2:OlapScan(orders, POOLING-SCAN)
    ```
    
    **Stacked bucket joins (whole-chain upgrade)**:
    
    ```sql
    -- orders BUCKETS 4, dim1 BUCKETS 4, dim2 BUCKETS 3
    SELECT d1_val, d2_val, SUM(o_totalprice) total
    FROM orders
    JOIN dim1 ON o_custkey = d1_key
    JOIN dim2 ON o_custkey = d2_key
    GROUP BY d1_val, d2_val ORDER BY total DESC;
    ```
    
    Without upgrade (ratio=0) — both joins capped at bucket count:
    
    ```
      7:AGG(update serialize, STREAMING)
      16:LOCAL-EXCHANGE(PASSTHROUGH)
      6:HASH JOIN(BUCKET_SHUFFLE)            ← join2, capped at 4 instances
      |----1:EXCHANGE
      5:HASH JOIN(BUCKET_SHUFFLE)            ← join1, capped at 4 instances
      |----3:EXCHANGE
      15:LOCAL-EXCHANGE(BUCKET_HASH)
      14:LOCAL-EXCHANGE(PASSTHROUGH)
      4:OlapScan(orders, POOLING-SCAN)
    ```
    
    With upgrade (ratio=1.1) — whole chain upgraded, all 16 instances join:
    
    ```
      7:AGG(update serialize, STREAMING)
      19:LOCAL-EXCHANGE(PASSTHROUGH)
      6:HASH JOIN(BUCKET_SHUFFLE)
      |----18:LOCAL-EXCHANGE(LOCAL_EXECUTION_HASH)  ← ★ UPGRADED
      |    1:EXCHANGE
      17:LOCAL-EXCHANGE(LOCAL_EXECUTION_HASH)        ← ★ upper join re-aligns
      5:HASH JOIN(BUCKET_SHUFFLE)
      |----16:LOCAL-EXCHANGE(LOCAL_EXECUTION_HASH)  ← ★ UPGRADED
      |    3:EXCHANGE
      15:LOCAL-EXCHANGE(LOCAL_EXECUTION_HASH)        ← ★ lower join also 
upgraded
      14:LOCAL-EXCHANGE(PASSTHROUGH)
      4:OlapScan(orders, POOLING-SCAN)
    ```
    
    The lower join upgrades and reports NOOP output, so the upper join
    inserts its own LE to re-align data to its own join keys.
    
    ---
    
    ### Release note
    
    Add session variable `local_shuffle_bucket_upgrade_ratio` (default 1.5;
    values <= 1 disable it) to control the bucket-to-hash parallelism
    upgrade in pooled-scan fragments: when per-BE instances exceed
    buckets-with-data by this ratio, bucket-shuffle local exchanges are
    upgraded to hash exchanges for higher join parallelism. With the default
    `bucket_shuffle_downgrade_ratio` (0.8) the upgrade only engages for
    stacked / wide bucket-domain shapes.
    
    ### Check List (For Author)
    
    - Test
        - [x] Regression test
        - [x] Unit Test
        - [ ] Manual test
        - [ ] No need to test
    
    - Behavior changed:
    - [x] Yes. Bucket-shuffle join fragments may now use
    `LOCAL_EXECUTION_HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE` local
    exchanges when the upgrade ratio is met. Setting
    `local_shuffle_bucket_upgrade_ratio=0` (or <= 1) restores the previous
    behavior.
---
 be/src/exec/exchange/vdata_stream_recvr.h          |  15 +-
 be/src/exec/operator/exchange_source_operator.cpp  |  13 +-
 be/src/exec/operator/exchange_source_operator.h    |  36 ++++
 be/src/exec/pipeline/pipeline_fragment_context.cpp |  10 +-
 be/src/runtime/runtime_state.cpp                   |   3 +-
 .../glue/translator/PlanTranslatorContext.java     |  31 ++++
 .../properties/ChildrenPropertiesRegulator.java    |  10 +-
 .../trees/plans/distribute/DistributePlanner.java  |  57 +++++-
 .../org/apache/doris/planner/AddLocalExchange.java | 105 +++++++++++
 .../org/apache/doris/planner/ExchangeNode.java     |   7 +
 .../org/apache/doris/planner/HashJoinNode.java     |  96 ++++++++--
 .../org/apache/doris/planner/RuntimeFilter.java    |  37 ++++
 .../java/org/apache/doris/qe/SessionVariable.java  |  41 +++++
 .../planner/LocalShuffleNodeCoverageTest.java      | 160 ++++++++++++++++-
 gensrc/thrift/PlanNodes.thrift                     |   6 +
 .../test_local_shuffle_bucket_upgrade.groovy       | 193 +++++++++++++++++++++
 .../test_local_shuffle_rqg_bugs.groovy             |  75 ++++++--
 17 files changed, 852 insertions(+), 43 deletions(-)

diff --git a/be/src/exec/exchange/vdata_stream_recvr.h 
b/be/src/exec/exchange/vdata_stream_recvr.h
index 2b395ee61f4..12650e780a1 100644
--- a/be/src/exec/exchange/vdata_stream_recvr.h
+++ b/be/src/exec/exchange/vdata_stream_recvr.h
@@ -198,7 +198,20 @@ public:
 
     void close();
 
-    void set_dependency(std::shared_ptr<Dependency> dependency) { 
_source_dependency = dependency; }
+    void set_dependency(std::shared_ptr<Dependency> dependency) {
+        // Assign under _lock: set_source_ready() (reached via 
decrement_senders/cancel/close
+        // on other threads) reads _source_dependency while holding _lock, so 
a lock-free
+        // shared_ptr assignment here would race with that read.
+        std::lock_guard<std::mutex> l(_lock);
+        _source_dependency = dependency;
+        // A queue created with zero senders (bucket-shuffle orphan instance, 
see
+        // ExchangeLocalState::create_stream_recvr) never goes through 
decrement_senders,
+        // so the usual reached-zero set_ready never fires — mark it ready at 
wiring time
+        // or its task blocks forever on SHUFFLE_DATA_DEPENDENCY.
+        if (_num_remaining_senders == 0) {
+            set_source_ready(l);
+        }
+    }
 
 protected:
     struct BlockItem;
diff --git a/be/src/exec/operator/exchange_source_operator.cpp 
b/be/src/exec/operator/exchange_source_operator.cpp
index 1e4d24de8e8..acdfe02d048 100644
--- a/be/src/exec/operator/exchange_source_operator.cpp
+++ b/be/src/exec/operator/exchange_source_operator.cpp
@@ -64,9 +64,17 @@ std::string ExchangeSourceOperatorX::debug_string(int 
indentation_level) const {
 
 void ExchangeLocalState::create_stream_recvr(RuntimeState* state) {
     auto& p = _parent->cast<ExchangeSourceOperatorX>();
+    int num_senders = p.num_senders();
+    if (p.is_bucket_shuffle_orphan_instance(local_task_idx)) {
+        // Bucket-routed senders open one channel per destination entry (one 
per bucket),
+        // so an instance owning no bucket never gets a channel — and never 
gets EOS.
+        // Start its receiver with zero senders so it reports EOS immediately 
instead of
+        // blocking forever (K-of-N destination spread).
+        num_senders = 0;
+    }
     stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
-            state, _memory_used_counter, state->fragment_instance_id(), 
p.node_id(),
-            p.num_senders(), custom_profile(), p.is_merging(),
+            state, _memory_used_counter, state->fragment_instance_id(), 
p.node_id(), num_senders,
+            custom_profile(), p.is_merging(),
             std::max(20480, config::exchg_node_buffer_size_bytes /
                                     (p.is_merging() ? p.num_senders() : 1)));
 }
@@ -75,6 +83,7 @@ Status ExchangeLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     RETURN_IF_ERROR(Base::init(state, info));
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_init_timer);
+    local_task_idx = info.task_idx;
     create_stream_recvr(state);
     const auto& queues = stream_recvr->sender_queues();
     deps.resize(queues.size());
diff --git a/be/src/exec/operator/exchange_source_operator.h 
b/be/src/exec/operator/exchange_source_operator.h
index c6f651aaffd..446be0559c8 100644
--- a/be/src/exec/operator/exchange_source_operator.h
+++ b/be/src/exec/operator/exchange_source_operator.h
@@ -19,6 +19,9 @@
 
 #include <stdint.h>
 
+#include <map>
+#include <set>
+
 #include "exec/operator/operator.h"
 #include "exprs/vexpr_fwd.h"
 
@@ -75,6 +78,9 @@ public:
     doris::VExprContextSPtrs ordering_expr_ctxs;
     int64_t num_rows_skipped;
     bool is_ready;
+    // per-BE local instance index (LocalStateInfo::task_idx), used for 
bucket-shuffle
+    // orphan detection in create_stream_recvr — see 
is_bucket_shuffle_orphan_instance.
+    int local_task_idx = 0;
 
     std::vector<std::shared_ptr<Dependency>> deps;
 
@@ -110,6 +116,34 @@ public:
     [[nodiscard]] int num_senders() const { return _num_senders; }
     [[nodiscard]] bool is_merging() const { return _is_merging; }
 
+    // Instances that bucket-routed senders can address: values of the 
fragment's
+    // bucket_seq_to_instance_idx map. Senders open one channel per 
destination entry
+    // (one per bucket), so an instance owning no bucket never gets a channel 
— and
+    // never gets EOS. Such orphan instances must start their receiver with 
zero
+    // senders or they block forever (K-of-N destination spread).
+    void set_bucket_dest_instances(const std::map<int, int>& 
bucket_seq_to_instance_idx) {
+        for (const auto& [bucket_seq, instance_idx] : 
bucket_seq_to_instance_idx) {
+            _bucket_dest_instances.insert(instance_idx);
+        }
+        _has_bucket_dest_instances = true;
+    }
+
+    // local_task_idx is the per-BE local instance index 
(LocalStateInfo::task_idx) — the
+    // same numbering as bucket_seq_to_instance_idx values (built per worker 
on FE). Do NOT
+    // pass per_fragment_instance_idx here: that is sender_id = the GLOBAL 
index across all
+    // workers, which only coincides with the local index on the first worker 
(single-BE
+    // tests pass, multi-BE silently drops every later worker's buckets).
+    //
+    // Ownership-based orphan detection is only valid when destinations follow 
bucket
+    // ownership, i.e. the non-serial (FE planner dest spread) mode. A serial 
exchange's
+    // destinations funnel to the first instance per worker regardless of 
bucket ownership,
+    // and BE's serial-exchange mechanics already close the other receivers.
+    [[nodiscard]] bool is_bucket_shuffle_orphan_instance(int local_task_idx) 
const {
+        return !is_serial_operator() &&
+               _partition_type == 
TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED &&
+               _has_bucket_dest_instances && 
!_bucket_dest_instances.contains(local_task_idx);
+    }
+
     DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
         if (OperatorX<ExchangeLocalState>::is_serial_operator()) {
             return {TLocalPartitionType::NOOP};
@@ -126,6 +160,8 @@ private:
     const int _num_senders;
     const bool _is_merging;
     const TPartitionType::type _partition_type;
+    std::set<int> _bucket_dest_instances;
+    bool _has_bucket_dest_instances = false;
 
     // use in merge sort
     size_t _offset;
diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp 
b/be/src/exec/pipeline/pipeline_fragment_context.cpp
index d4f74843179..125c6e18fdd 100644
--- a/be/src/exec/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp
@@ -1563,8 +1563,14 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
                                   ? 
_params.per_exch_num_senders.find(tnode.node_id)->second
                                   : 0;
         DCHECK_GT(num_senders, 0);
-        op = std::make_shared<ExchangeSourceOperatorX>(pool, tnode, 
next_operator_id(), descs,
-                                                       num_senders);
+        auto exchange_op = std::make_shared<ExchangeSourceOperatorX>(
+                pool, tnode, next_operator_id(), descs, num_senders);
+        if (!_params.bucket_seq_to_instance_idx.empty()) {
+            // Lets bucket-routed exchanges detect orphan instances (owning no 
bucket) that
+            // no sender channel will ever address — their receivers must 
start at EOS.
+            
exchange_op->set_bucket_dest_instances(_params.bucket_seq_to_instance_idx);
+        }
+        op = exchange_op;
         RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
         fe_with_old_version = !tnode.__isset.is_serial_operator;
         break;
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 9e173acab7e..54989511eb8 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -508,7 +508,8 @@ Status RuntimeState::register_consumer_runtime_filter(
         const TRuntimeFilterDesc& desc, bool need_local_merge, int node_id,
         std::shared_ptr<RuntimeFilterConsumer>* consumer_filter) {
     _registered_runtime_filter_ids.insert(desc.filter_id);
-    bool need_merge = desc.has_remote_targets || need_local_merge;
+    bool need_merge = desc.has_remote_targets || need_local_merge ||
+                      (desc.__isset.force_local_merge && 
desc.force_local_merge);
     RuntimeFilterMgr* mgr = need_merge ? global_runtime_filter_mgr() : 
local_runtime_filter_mgr();
     RETURN_IF_ERROR(mgr->register_consumer_filter(this, desc, node_id, 
consumer_filter));
     // Stamp the consumer with the current recursive CTE stage so that 
incoming publish RPCs
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
index c4ffab8b5dc..f680936b5e5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
@@ -127,6 +127,21 @@ public class PlanTranslatorContext {
     // needs shuffle for correctness, not just for performance like 
StreamingAgg pre-agg).
     private final Map<PlanNodeId, Boolean> shuffledAncestorMap = 
Maps.newHashMap();
 
+    // Whether the fragment currently being processed by AddLocalExchange is 
eligible for the
+    // bucket → local-hash parallelism upgrade: a pooled bucket-join fragment 
whose per-BE
+    // instance count exceeds (buckets-with-data per BE) × 
local_shuffle_bucket_upgrade_ratio.
+    // Computed once per fragment in AddLocalExchange.addLocalExchange from 
the distributed
+    // plan's LocalShuffleBucketJoinAssignedJob assignments; read by
+    // HashJoinNode.enforceAndDeriveLocalExchange.
+    private boolean currentFragmentBucketUpgradeEligible = false;
+
+    // Per-node "a bucket join above me in this fragment already upgraded to 
local hash" flag.
+    // An upgraded join marks its direct children so a stacked bucket join 
below keeps its
+    // BUCKET_HASH_SHUFFLE requires: if it also upgraded, its LOCAL hash 
output (keyed by ITS
+    // join keys) would type-satisfy the upper join's 
requireSpecific(LOCAL_EXECUTION_HASH)
+    // and suppress the LE that re-aligns data to the upper join's keys → 
wrong results.
+    private final Map<PlanNodeId, Boolean> bucketUpgradedAncestorMap = 
Maps.newHashMap();
+
     // Whether the current fragment uses LocalShuffleAssignedJob (pooling scan 
with
     // ignoreDataDistribution → _parallel_instances=1 in BE). When true, 
serial operators
     // indicate real pipeline bottlenecks needing PASSTHROUGH fan-out 
(heavy_ops).
@@ -271,6 +286,22 @@ public class PlanTranslatorContext {
         return shuffledAncestorMap.getOrDefault(node.getId(), false);
     }
 
+    public void setCurrentFragmentBucketUpgradeEligible(boolean eligible) {
+        this.currentFragmentBucketUpgradeEligible = eligible;
+    }
+
+    public boolean isCurrentFragmentBucketUpgradeEligible() {
+        return currentFragmentBucketUpgradeEligible;
+    }
+
+    public void setHasBucketUpgradedAncestor(PlanNode node, boolean value) {
+        bucketUpgradedAncestorMap.put(node.getId(), value);
+    }
+
+    public boolean hasBucketUpgradedAncestor(PlanNode node) {
+        return bucketUpgradedAncestorMap.getOrDefault(node.getId(), false);
+    }
+
     public SlotDescriptor addSlotDesc(TupleDescriptor t) {
         return descTable.addSlotDescriptor(t);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
index 845c87eea9c..e5e0d9d1bd0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
@@ -311,7 +311,15 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<List<List<PhysicalP
                 int bucketNum = 
candidate.getTable().getDefaultDistributionInfo().getBucketNum();
                 int totalBucketNum = prunedPartNum * bucketNum;
                 ConnectContext connectContext = ConnectContext.get();
-                return totalBucketNum < connectContext.getTotalInstanceNum() * 
0.8;
+                // <= 0 disables the downgrade entirely: with the FE local 
shuffle planner's
+                // bucket -> local-hash upgrade 
(local_shuffle_bucket_upgrade_ratio), few-bucket
+                // bucket shuffle no longer funnels, so keeping bucket shuffle 
(anchored side
+                // needs no re-shuffle) can beat downgrading to shuffle join.
+                double downgradeRatio = 
connectContext.getSessionVariable().getBucketShuffleDowngradeRatio();
+                if (downgradeRatio <= 0) {
+                    return false;
+                }
+                return totalBucketNum < connectContext.getTotalInstanceNum() * 
downgradeRatio;
             }
         }
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
index 0069ddd37db..fd1ff52f5f9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
@@ -31,6 +31,7 @@ import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJobBui
 import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.BucketScanSource;
 import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.DefaultScanSource;
 import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.LocalShuffleAssignedJob;
+import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.LocalShuffleBucketJoinAssignedJob;
 import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.StaticAssignedJob;
 import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJob;
 import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJobBuilder;
@@ -211,7 +212,7 @@ public class DistributePlanner {
         List<AssignedJob> receiverInstances = 
filterInstancesWhichCanReceiveDataFromRemote(
                 receiverPlan, enableShareHashTableForBroadcastJoin, linkNode);
         if (linkNode.getPartitionType() == 
TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED) {
-            receiverInstances = getDestinationsByBuckets(receiverPlan, 
receiverInstances);
+            receiverInstances = getDestinationsByBuckets(receiverPlan, 
receiverInstances, linkNode);
         }
 
         DataSink sink = senderPlan.getFragmentJob().getFragment().getSink();
@@ -231,12 +232,34 @@ public class DistributePlanner {
 
     private List<AssignedJob> getDestinationsByBuckets(
             PipelineDistributedPlan joinSide,
-            List<AssignedJob> receiverInstances) {
+            List<AssignedJob> receiverInstances,
+            ExchangeNode linkNode) {
         UnassignedScanBucketOlapTableJob bucketJob = 
(UnassignedScanBucketOlapTableJob) joinSide.getFragmentJob();
         int bucketNum = bucketJob.getOlapScanNodes().get(0).getBucketNum();
+        // The spread is only valid for a NON-serial exchange: a serial 
exchange
+        // (use_serial_exchange / UNPARTITIONED) receives through one task per 
worker and
+        // expects funnel destinations; spreading them loses every row 
addressed to a
+        // non-first instance. Mirrors the !is_serial_operator() gate on the 
BE orphan
+        // receiver fix.
+        if (isEnableLocalShufflePlanner()
+                && 
!linkNode.isSerialOperatorOnBe(statementContext.getConnectContext())
+                && !joinSide.getInstanceJobs().isEmpty()
+                && joinSide.getInstanceJobs().stream()
+                        
.allMatch(LocalShuffleBucketJoinAssignedJob.class::isInstance)) {
+            // When FE local shuffle planner is on, spread bucket destinations 
across all pooled
+            // instances by their assigned join buckets — the same bucket -> 
instance mapping as
+            // bucket_seq_to_instance_id sent to BE — instead of funneling 
every bucket of a worker
+            // into its first instance and relying on BE local exchange to fan 
out.
+            return sortDestinationInstancesByJoinBuckets(joinSide, bucketNum);
+        }
         return sortDestinationInstancesByBuckets(joinSide, receiverInstances, 
bucketNum);
     }
 
+    private boolean isEnableLocalShufflePlanner() {
+        ConnectContext connectContext = statementContext.getConnectContext();
+        return connectContext != null && 
connectContext.getSessionVariable().isEnableLocalShufflePlanner();
+    }
+
     private List<AssignedJob> filterInstancesWhichCanReceiveDataFromRemote(
             PipelineDistributedPlan receiverPlan,
             boolean enableShareHashTableForBroadcastJoin,
@@ -252,6 +275,36 @@ public class DistributePlanner {
         }
     }
 
+    private List<AssignedJob> sortDestinationInstancesByJoinBuckets(
+            PipelineDistributedPlan plan, int bucketNum) {
+        AssignedJob[] instances = new AssignedJob[bucketNum];
+        for (AssignedJob instanceJob : plan.getInstanceJobs()) {
+            LocalShuffleBucketJoinAssignedJob localShuffleJob = 
(LocalShuffleBucketJoinAssignedJob) instanceJob;
+            for (Integer bucketIndex : 
localShuffleJob.getAssignedJoinBucketIndexes()) {
+                if (instances[bucketIndex] != null) {
+                    throw new IllegalStateException(
+                            "Multi instances assigned same join bucket: " + 
instances[bucketIndex]
+                                    + " and " + instanceJob
+                    );
+                }
+                instances[bucketIndex] = instanceJob;
+            }
+        }
+
+        for (int i = 0; i < instances.length; i++) {
+            if (instances[i] == null) {
+                instances[i] = new StaticAssignedJob(
+                        i,
+                        new TUniqueId(-1, -1),
+                        plan.getFragmentJob(),
+                        DummyWorker.INSTANCE,
+                        new DefaultScanSource(ImmutableMap.of())
+                );
+            }
+        }
+        return Arrays.asList(instances);
+    }
+
     private List<AssignedJob> sortDestinationInstancesByBuckets(
             PipelineDistributedPlan plan, List<AssignedJob> unsorted, int 
bucketNum) {
         AssignedJob[] instances = new AssignedJob[bucketNum];
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java
index e1d607ea615..1adea56c13e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java
@@ -22,9 +22,18 @@ import 
org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
 import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
 import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
 import org.apache.doris.nereids.trees.plans.distribute.PipelineDistributedPlan;
+import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob;
+import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.LocalShuffleBucketJoinAssignedJob;
 import org.apache.doris.planner.LocalExchangeNode.LocalExchangeType;
 import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire;
 import org.apache.doris.planner.LocalExchangeNode.RequireHash;
+import org.apache.doris.qe.ConnectContext;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 /**
  * FE-side local exchange planner — inserts {@link LocalExchangeNode} into 
each fragment's
@@ -81,11 +90,107 @@ public class AddLocalExchange {
             if (maxPerBeInstances <= 1) {
                 continue;
             }
+            context.setCurrentFragmentBucketUpgradeEligible(
+                    isBucketUpgradeEligible(pipePlan, maxPerBeInstances, 
context));
             PlanFragment fragment = pipePlan.getFragmentJob().getFragment();
             addLocalExchangeForFragment(fragment, context);
         }
     }
 
+    /**
+     * Bucket → local-hash parallelism upgrade eligibility .
+     *
+     * A pooled bucket-join fragment runs its bucket joins at bucket-count 
parallelism:
+     * each LocalShuffleBucketJoinAssignedJob owns a disjoint set of join 
buckets and only
+     * instances with buckets do join work (e.g. 8 buckets/BE but 16 
instances/BE → 8 idle).
+     * When nothing above the join needs bucket alignment, HashJoinNode can 
re-distribute
+     * both sides with LOCAL_EXECUTION_HASH_SHUFFLE to use all instances — see
+     * {@link HashJoinNode#enforceAndDeriveLocalExchange}.
+     *
+     * This method computes the per-fragment numeric condition from the actual 
instance
+     * assignment: maxPerBeInstances > maxBucketsWithDataPerWorker × ratio.  
The ratio comes
+     * from session variable {@code local_shuffle_bucket_upgrade_ratio}; 
values <= 1 disable
+     * the upgrade entirely (a required parallelism gain of at most 1x means 
no gain).
+     */
+    private boolean isBucketUpgradeEligible(PipelineDistributedPlan pipePlan,
+            long maxPerBeInstances, PlanTranslatorContext context) {
+        ConnectContext connectContext = context.getConnectContext();
+        if (connectContext == null || connectContext.getSessionVariable() == 
null) {
+            return false;
+        }
+        double ratio = 
connectContext.getSessionVariable().getLocalShuffleBucketUpgradeRatio();
+        List<AssignedJob> instanceJobs = pipePlan.getInstanceJobs();
+        if (instanceJobs.isEmpty()
+                || 
!instanceJobs.stream().allMatch(LocalShuffleBucketJoinAssignedJob.class::isInstance))
 {
+            // Only pooled bucket-join fragments have the bucket-count 
parallelism cap.
+            return false;
+        }
+        Map<Long, Set<Integer>> bucketsPerWorker = new HashMap<>();
+        Map<Long, Integer> instancesPerWorker = new HashMap<>();
+        Map<Long, Integer> coresPerWorker = new HashMap<>();
+        for (AssignedJob job : instanceJobs) {
+            long workerId = job.getAssignedWorker().id();
+            bucketsPerWorker.computeIfAbsent(workerId, k -> new HashSet<>())
+                    .addAll(((LocalShuffleBucketJoinAssignedJob) 
job).getAssignedJoinBucketIndexes());
+            instancesPerWorker.merge(workerId, 1, Integer::sum);
+            coresPerWorker.computeIfAbsent(workerId, k -> 
resolveWorkerCores(job.getAssignedWorker()));
+        }
+        // Conservative: every worker that owns buckets must clear the gain 
bar. The gain is
+        // computed on EFFECTIVE parallelism (capped by the BE's executor 
threads): when the
+        // bucket count already saturates the cores, adding instances cannot 
speed the join
+        // up and the extra local exchange is a pure cost.
+        boolean anyBuckets = false;
+        for (Map.Entry<Long, Set<Integer>> entry : 
bucketsPerWorker.entrySet()) {
+            int buckets = entry.getValue().size();
+            if (buckets == 0) {
+                continue;
+            }
+            anyBuckets = true;
+            int instances = instancesPerWorker.getOrDefault(entry.getKey(), 0);
+            int cores = coresPerWorker.getOrDefault(entry.getKey(), 
Integer.MAX_VALUE);
+            if (!shouldUpgradeBucketParallelism(ratio,
+                    Math.min(instances, cores), Math.min(buckets, cores))) {
+                return false;
+            }
+        }
+        return anyBuckets;
+    }
+
+    /**
+     * Effective execution threads of the worker's backend 
(pipelineExecutorSize, falling
+     * back to cpuCores). Values <= 1 mean the heartbeat has not reported yet 
— treat the
+     * capacity as unknown/uncapped rather than blocking the upgrade.
+     */
+    private static int resolveWorkerCores(
+            
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker 
worker) {
+        if (worker instanceof 
org.apache.doris.nereids.trees.plans.distribute.worker.BackendWorker) {
+            org.apache.doris.system.Backend backend =
+                    
((org.apache.doris.nereids.trees.plans.distribute.worker.BackendWorker) 
worker).getBackend();
+            int size = backend.getPipelineExecutorSize();
+            if (size <= 1) {
+                size = backend.getCputCores();
+            }
+            if (size > 1) {
+                return size;
+            }
+        }
+        return Integer.MAX_VALUE;
+    }
+
+    /**
+     * Pure numeric gate for the bucket → local-hash upgrade.
+     * ratio <= 1 (including 0 and negatives) always disables; otherwise 
upgrade when the
+     * per-BE instance count exceeds buckets-with-data × ratio (i.e. the 
parallelism gain
+     * is at least the configured multiple).
+     */
+    static boolean shouldUpgradeBucketParallelism(double ratio, long 
maxPerBeInstances,
+            long maxBucketsPerWorker) {
+        if (ratio <= 1.0) {
+            return false;
+        }
+        return maxBucketsPerWorker > 0 && maxPerBeInstances > 
maxBucketsPerWorker * ratio;
+    }
+
     private void addLocalExchangeForFragment(PlanFragment fragment, 
PlanTranslatorContext context) {
         DataSink sink = fragment.getSink();
         LocalExchangeTypeRequire require = sink == null
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
index 256f89843eb..8898987d9a7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
@@ -165,6 +165,13 @@ public class ExchangeNode extends PlanNode {
 
     @Override
     public boolean isSerialOperatorOnBe(ConnectContext context) {
+        if (context != null && 
context.getSessionVariable().isEnableLocalShufflePlanner()) {
+            // When FE local shuffle planner is on, decouple exchange from 
scan's serial flag.
+            // Scan pooling is handled by LE(PT) after scan; exchange keeps 
its own parallelism.
+            return fragment != null
+                    && isSerialNode()
+                    && fragment.useSerialSource(context);
+        }
         return fragment != null
                 && (isSerialNode() || fragment.hasSerialScanNode())
                 && fragment.useSerialSource(context);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
index 567c99866fd..e9d8ac63041 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
@@ -344,31 +344,65 @@ public class HashJoinNode extends JoinNodeBase {
             // For a non-serial probe without the flag: propagate the probe's 
distribution.
             outputType = probePassthrough ? LocalExchangeType.PASSTHROUGH : 
null;
         } else if (isColocate() || isBucketShuffle()) {
-            // Both probe and build sides require BUCKET_HASH_SHUFFLE: the 
bucket distribution
-            // must be preserved on both inputs. A serial child on either side 
is handled the
-            // same way (serial exchange returns NOOP → enforceRequire() 
inserts the LE).
-            probeSideRequire = LocalExchangeTypeRequire.requireBucketHash();
-            // For BUCKET_SHUFFLE with serial build child: use 
requireBucketHash() (not
-            // requirePassToOne()). Unlike BROADCAST joins, BUCKET_SHUFFLE has 
no shared
-            // hash table mechanism — PASS_TO_ONE routes all data to task 0 
while tasks 1..N-1
-            // build empty hash tables, losing rows. BUCKET_HASH_SHUFFLE 
correctly distributes
-            // build data by bucket to match the probe side's bucket 
distribution.
-            // The serial exchange returns NOOP, so enforceRequire() will 
insert a
-            // BUCKET_HASH_SHUFFLE local exchange (with PASSTHROUGH fan-out 
for heavy-ops
-            // bottleneck avoidance).
-            buildSideRequire = LocalExchangeTypeRequire.requireBucketHash();
-            outputType = AddLocalExchange.resolveExchangeType(
-                    LocalExchangeTypeRequire.requireBucketHash());
+            if (canUpgradeBucketToLocalHash(translatorContext, parentRequire)) 
{
+                // Bucket → local-hash parallelism upgrade (bucket-to-hash 
upgrade): the fragment
+                // has noticeably more instances than buckets-with-data (see
+                // AddLocalExchange.isBucketUpgradeEligible) and nothing above 
this join needs
+                // bucket alignment — re-distribute both sides by their 
distribute keys with
+                // LOCAL_EXECUTION_HASH_SHUFFLE so the join runs at full 
instance parallelism
+                // instead of being capped at bucket count.  The LE keys come 
from
+                // childrenDistributeExprLists (pairwise-aligned per side, a 
subset of the
+                // equi-join keys), so both sides keep hashing the same values 
and the
+                // per-instance build/probe pairing stays correct.
+                //
+                // requireSpecific (not requireHash) on purpose: the children's
+                // BUCKET_HASH_SHUFFLE output must NOT satisfy this require, 
otherwise no LE
+                // is inserted and the join stays bucket-capped.
+                //
+                // Mark direct children so a stacked bucket join below keeps 
its BUCKET
+                // requires: if it also upgraded, its LOCAL hash output (keyed 
by ITS join
+                // keys) would type-satisfy our 
requireSpecific(LOCAL_EXECUTION_HASH) and
+                // suppress the LE that re-aligns data to OUR keys → wrong 
results.
+                
translatorContext.setHasBucketUpgradedAncestor(children.get(0), true);
+                
translatorContext.setHasBucketUpgradedAncestor(children.get(1), true);
+                probeSideRequire = LocalExchangeTypeRequire.requireSpecific(
+                        LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+                buildSideRequire = LocalExchangeTypeRequire.requireSpecific(
+                        LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+                // Whole-chain upgrade: a stacked bucket join below an 
upgraded one also
+                // upgrades (16-way instead of bucket-capped), but must NOT 
let its LOCAL
+                // hash claim type-satisfy the upper join's 
requireSpecific(LOCAL) — the
+                // keys may differ (each level hashes its own distribute 
exprs). Claim NOOP
+                // so the upper join always inserts its own re-align LE; that 
LE existed in
+                // the bucket world too (bucket claim never satisfied LOCAL 
require), so
+                // the chain upgrade is pure parallelism gain.
+                outputType = translatorContext.hasBucketUpgradedAncestor(this)
+                        ? LocalExchangeType.NOOP
+                        : null; // null: derived from probeResult.second below
+            } else {
+                probeSideRequire = 
LocalExchangeTypeRequire.requireBucketHash();
+                // For BUCKET_SHUFFLE with serial build child: use 
requireBucketHash() (not
+                // requirePassToOne()). Unlike BROADCAST joins, BUCKET_SHUFFLE 
has no shared
+                // hash table mechanism — PASS_TO_ONE routes all data to task 
0 while tasks 1..N-1
+                // build empty hash tables, losing rows. BUCKET_HASH_SHUFFLE 
correctly distributes
+                // build data by bucket to match the probe side's bucket 
distribution.
+                // The serial exchange returns NOOP, so enforceRequire() will 
insert a
+                // BUCKET_HASH_SHUFFLE local exchange (with PASSTHROUGH 
fan-out for heavy-ops
+                // bottleneck avoidance).
+                buildSideRequire = 
LocalExchangeTypeRequire.requireBucketHash();
+                outputType = AddLocalExchange.resolveExchangeType(
+                        LocalExchangeTypeRequire.requireBucketHash());
+            }
         } else {
             // PARTITIONED (shuffle) join: both sides enter via global hash 
exchange.
             // Require GLOBAL specifically so that any inserted exchange uses 
the same
             // instance mapping as the cross-fragment exchange. LOCAL hash has 
a different
             // modulus (per-BE instance count vs total instance count) and 
would cause
-            // join mismatches (DORIS-26101).
+            // join mismatches (cross-fragment exchange key mismatch).
             //
             // Exception: serial source (use_serial_exchange=true + pooling). 
The serial
             // exchange sends to a single BE so shuffle_idx_to_instance_idx 
has only one
-            // entry — GLOBAL hash would route data to non-existent indices 
(DORIS-26120).
+            // entry — GLOBAL hash would route data to non-existent indices 
(serial source global hash fallback).
             // Fall back to generic requireHash() which resolves to LOCAL, 
matching BE's
             // _use_serial_source behavior.
             boolean serialSource = fragment != null
@@ -394,4 +428,32 @@ public class HashJoinNode extends JoinNodeBase {
     protected boolean shouldResetSerialFlagForChild(int childIndex) {
         return childIndex == 1;
     }
+
+    /**
+     * Whether this bucket-shuffle / colocate join may upgrade its children 
requires from
+     * BUCKET_HASH_SHUFFLE to LOCAL_EXECUTION_HASH_SHUFFLE for higher 
parallelism:
+     * <ul>
+     *   <li>the fragment passed the numeric gate (instances vs 
buckets-with-data × ratio),
+     *       computed once per fragment in {@code AddLocalExchange};</li>
+     *   <li>stacked bucket joins below an upgraded one also upgrade, but 
report NOOP
+     *       output so the upper join's re-align LE is always inserted — see 
the
+     *       whole-chain note in {@code enforceAndDeriveLocalExchange};</li>
+     *   <li>the parent does not require bucket distribution of our output (an 
upper
+     *       bucket join's probe/build require — upgrading here would break 
the bucket
+     *       alignment it depends on);</li>
+     *   <li>both sides have non-empty distribute exprs — they become the 
LOCAL hash LE
+     *       keys, an exprs-less hash exchange would be meaningless.</li>
+     * </ul>
+     */
+    private boolean canUpgradeBucketToLocalHash(PlanTranslatorContext 
translatorContext,
+            LocalExchangeTypeRequire parentRequire) {
+        if (!translatorContext.isCurrentFragmentBucketUpgradeEligible()
+                || parentRequire.preferType() == 
LocalExchangeType.BUCKET_HASH_SHUFFLE) {
+            return false;
+        }
+        List<Expr> probeExprs = getChildDistributeExprList(0);
+        List<Expr> buildExprs = getChildDistributeExprList(1);
+        return probeExprs != null && !probeExprs.isEmpty()
+                && buildExprs != null && !buildExprs.isEmpty();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
index 28062b97b7a..77bb4ffcdc1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
@@ -257,6 +257,29 @@ public final class RuntimeFilter {
         return finalized;
     }
 
+    /**
+     * DFS from {@code node} down to {@code target} within the fragment 
(stopping at
+     * ExchangeNode boundaries). Returns null if target is not under node, 
otherwise
+     * whether the path crosses a LocalExchangeNode.
+     */
+    private static Boolean pathCrossesLocalExchange(PlanNode node, PlanNode 
target) {
+        if (node == target) {
+            return false;
+        }
+        for (PlanNode child : node.getChildren()) {
+            if (child instanceof ExchangeNode) {
+                // fragment boundary: a target behind it is a remote target, 
handled by
+                // has_remote_targets
+                continue;
+            }
+            Boolean sub = pathCrossesLocalExchange(child, target);
+            if (sub != null) {
+                return sub || child instanceof LocalExchangeNode;
+            }
+        }
+        return null;
+    }
+
     /**
      * Serializes a runtime filter to Thrift.
      */
@@ -270,11 +293,25 @@ public final class RuntimeFilter {
         tFilter.setHasRemoteTargets(hasRemoteTargets);
 
         boolean hasSerialTargets = false;
+        boolean forceLocalMerge = false;
         for (RuntimeFilterTarget target : targets) {
             tFilter.putToPlanIdToTargetExpr(target.node.getId().asInt(), 
ExprToThriftVisitor.treeToThrift(target.expr));
             hasSerialTargets = hasSerialTargets
                     || target.node.isSerialOperatorOnBe(ConnectContext.get());
+            // Truthful merge signal: if a LocalExchangeNode sits between the 
builder join
+            // and a same-fragment target scan, per-instance partial filters 
are not aligned
+            // with the scan's data slice and must be merged before being 
applied. BE used to
+            // infer this from the target scan's is_serial_operator (scan 
pooled => LE
+            // in between), which silently breaks once the scan is 
parallelized; this bit is
+            // computed from the actual plan after FE local exchange planning. 
In BE-planned
+            // mode (planner off) the FE tree has no LocalExchangeNodes and 
the bit stays
+            // false — the serial-flag inference still covers that world.
+            if (!forceLocalMerge && target.isLocalTarget) {
+                Boolean crossed = pathCrossesLocalExchange(builderNode, 
target.node);
+                forceLocalMerge = crossed != null && crossed;
+            }
         }
+        tFilter.setForceLocalMerge(forceLocalMerge);
 
         boolean enableSyncFilterSize = ConnectContext.get() != null
                 && 
ConnectContext.get().getSessionVariable().enableSyncRuntimeFilterSize();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 2870c3443fe..01967590dfb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -361,6 +361,10 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String ENABLE_LOCAL_SHUFFLE_PLANNER = 
"enable_local_shuffle_planner";
 
+    public static final String LOCAL_SHUFFLE_BUCKET_UPGRADE_RATIO = 
"local_shuffle_bucket_upgrade_ratio";
+
+    public static final String BUCKET_SHUFFLE_DOWNGRADE_RATIO = 
"bucket_shuffle_downgrade_ratio";
+
     public static final String FORCE_TO_LOCAL_SHUFFLE = 
"force_to_local_shuffle";
 
     public static final String ENABLE_LOCAL_MERGE_SORT = 
"enable_local_merge_sort";
@@ -1639,6 +1643,27 @@ public class SessionVariable implements Serializable, 
Writable {
                         "Whether to force to local shuffle on pipelineX 
engine."})
     private boolean forceToLocalShuffle = false;
 
+    @VarAttrDef.VarAttr(
+            name = LOCAL_SHUFFLE_BUCKET_UPGRADE_RATIO, fuzzy = false, varType 
= VariableAnnotation.EXPERIMENTAL,
+            description = {"FE规划Local Shuffle时, 当池化bucket 
join所在fragment的每BE实例数大于"
+                    + "每BE有数据分桶数的该倍数时, 将join两侧的桶分布本地重分发为hash分布以突破桶数并发上限。"
+                    + "必须大于1才生效; 小于等于1(含0和负数)时关闭该优化",
+                    "When FE plans local shuffle and a pooled bucket join 
fragment has more instances"
+                    + " per BE than (buckets-with-data per BE) * this ratio, 
re-distribute both join"
+                    + " sides with local hash instead of bucket hash so join 
parallelism is no longer"
+                    + " capped at bucket count. Only takes effect when > 1; 
values <= 1 (including 0"
+                    + " and negatives) disable the upgrade."}, needForward = 
true)
+    private double localShuffleBucketUpgradeRatio = 1.5;
+
+    @VarAttrDef.VarAttr(
+            name = BUCKET_SHUFFLE_DOWNGRADE_RATIO, fuzzy = false, varType = 
VariableAnnotation.EXPERIMENTAL,
+            description = {"当一侧基表总桶数小于总实例数的该倍数时, 放弃bucket shuffle 
join降级为shuffle join。"
+                    + "小于等于0时永不降级。默认0.8保持原有行为",
+                    "Downgrade bucket shuffle join to shuffle join when the 
base table side's total"
+                    + " bucket count is less than total instance count times 
this ratio. Values <= 0"
+                    + " never downgrade. Default 0.8 keeps the original 
behavior."}, needForward = true)
+    private double bucketShuffleDowngradeRatio = 0.8;
+
     @VarAttrDef.VarAttr(name = ENABLE_LOCAL_MERGE_SORT)
     private boolean enableLocalMergeSort = true;
 
@@ -4768,6 +4793,22 @@ public class SessionVariable implements Serializable, 
Writable {
         this.enableLocalShufflePlanner = enableLocalShufflePlanner;
     }
 
+    public double getLocalShuffleBucketUpgradeRatio() {
+        return localShuffleBucketUpgradeRatio;
+    }
+
+    public void setLocalShuffleBucketUpgradeRatio(double 
localShuffleBucketUpgradeRatio) {
+        this.localShuffleBucketUpgradeRatio = localShuffleBucketUpgradeRatio;
+    }
+
+    public double getBucketShuffleDowngradeRatio() {
+        return bucketShuffleDowngradeRatio;
+    }
+
+    public void setBucketShuffleDowngradeRatio(double 
bucketShuffleDowngradeRatio) {
+        this.bucketShuffleDowngradeRatio = bucketShuffleDowngradeRatio;
+    }
+
     public boolean enablePushDownNoGroupAgg() {
         return enablePushDownNoGroupAgg;
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java
index 7288ba49ca2..5ffe61ce8f5 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java
@@ -24,6 +24,7 @@ import org.apache.doris.analysis.GroupingInfo;
 import org.apache.doris.analysis.JoinOperator;
 import org.apache.doris.analysis.OrderByElement;
 import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.SortInfo;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.analysis.TupleId;
@@ -229,12 +230,12 @@ public class LocalShuffleNodeCoverageTest {
         hashJoin.setDistributionMode(DistributionMode.PARTITIONED);
         Pair<PlanNode, LocalExchangeType> hashOutput = 
hashJoin.enforceAndDeriveLocalExchange(
                 ctx, null, LocalExchangeTypeRequire.requireHash());
-        // PARTITIONED join requires GLOBAL hash to match cross-fragment 
exchange (DORIS-26101)
+        // PARTITIONED join requires GLOBAL hash to match cross-fragment 
exchange
         
Assertions.assertEquals(LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE, 
hashOutput.second);
         assertChildLocalExchangeType(hashJoin, 0, 
LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE);
         assertChildLocalExchangeType(hashJoin, 1, 
LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE);
 
-        // DORIS-26101: PARTITIONED join with probe child already providing 
GLOBAL hash
+        // PARTITIONED join with probe child already providing GLOBAL hash
         // (e.g. upstream ExchangeNode) should satisfy 
requireGlobalExecutionHash without
         // inserting a new exchange.
         TrackingPlanNode probeGlobal = new TrackingPlanNode(nextPlanNodeId(),
@@ -251,7 +252,7 @@ public class LocalShuffleNodeCoverageTest {
                 "no exchange should be inserted when child already provides 
GLOBAL hash");
         Assertions.assertSame(buildGlobal, partitionedSatisfied.getChild(1));
 
-        // DORIS-26120: PARTITIONED join with serial source falls back to 
LOCAL hash
+        // PARTITIONED join with serial source falls back to LOCAL hash
         // because GLOBAL shuffle_idx_to_instance_idx is incomplete for serial 
exchange.
         TrackingScanNode probeSerial = new TrackingScanNode(nextPlanNodeId(), 
LocalExchangeType.NOOP);
         TrackingPlanNode buildSerial = new TrackingPlanNode(nextPlanNodeId(), 
LocalExchangeType.NOOP);
@@ -312,6 +313,157 @@ public class LocalShuffleNodeCoverageTest {
         assertChildLocalExchangeType(serialBuildBroadcast, 1, 
LocalExchangeType.PASS_TO_ONE);
     }
 
+    private static List<List<Expr>> mockDistributeExprLists() {
+        return Lists.newArrayList(
+                Collections.singletonList(Mockito.mock(SlotRef.class)),
+                Collections.singletonList(Mockito.mock(SlotRef.class)));
+    }
+
+    @Test
+    public void testHashJoinBucketUpgradeToLocalHash() {
+        List<Expr> eqConjuncts = 
Collections.singletonList(Mockito.mock(BinaryPredicate.class));
+
+        // 1. Eligible fragment + parent doesn't need bucket → both sides 
re-distributed
+        //    with LOCAL_EXECUTION_HASH_SHUFFLE, output reports LOCAL hash.
+        PlanTranslatorContext upgradeCtx = new PlanTranslatorContext();
+        upgradeCtx.setCurrentFragmentBucketUpgradeEligible(true);
+        TrackingPlanNode probeBucket = new TrackingPlanNode(nextPlanNodeId(), 
LocalExchangeType.BUCKET_HASH_SHUFFLE);
+        TrackingPlanNode buildNoop = new TrackingPlanNode(nextPlanNodeId(), 
LocalExchangeType.NOOP);
+        HashJoinNode upgradedJoin = new HashJoinNode(nextPlanNodeId(), 
probeBucket, buildNoop,
+                JoinOperator.INNER_JOIN, eqConjuncts, Collections.emptyList(), 
null, null, false);
+        upgradedJoin.setChildrenDistributeExprLists(mockDistributeExprLists());
+        upgradedJoin.setDistributionMode(DistributionMode.BUCKET_SHUFFLE);
+        Pair<PlanNode, LocalExchangeType> upgradedOutput = 
upgradedJoin.enforceAndDeriveLocalExchange(
+                upgradeCtx, null, LocalExchangeTypeRequire.requireHash());
+        // BUCKET claim must NOT satisfy the upgrade's 
requireSpecific(LOCAL_EXECUTION_HASH):
+        // an LE is inserted on both sides.
+        
Assertions.assertEquals(LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE, 
upgradedOutput.second);
+        assertChildLocalExchangeType(upgradedJoin, 0, 
LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+        assertChildLocalExchangeType(upgradedJoin, 1, 
LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+
+        // 2. Child already providing LOCAL hash satisfies the upgraded 
require — no extra LE.
+        PlanTranslatorContext satisfiedCtx = new PlanTranslatorContext();
+        satisfiedCtx.setCurrentFragmentBucketUpgradeEligible(true);
+        TrackingPlanNode probeLocal = new TrackingPlanNode(nextPlanNodeId(),
+                LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+        TrackingPlanNode buildLocal = new TrackingPlanNode(nextPlanNodeId(),
+                LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+        HashJoinNode satisfiedJoin = new HashJoinNode(nextPlanNodeId(), 
probeLocal, buildLocal,
+                JoinOperator.INNER_JOIN, eqConjuncts, Collections.emptyList(), 
null, null, false);
+        
satisfiedJoin.setChildrenDistributeExprLists(mockDistributeExprLists());
+        satisfiedJoin.setDistributionMode(DistributionMode.BUCKET_SHUFFLE);
+        Pair<PlanNode, LocalExchangeType> satisfiedUpgrade = 
satisfiedJoin.enforceAndDeriveLocalExchange(
+                satisfiedCtx, null, LocalExchangeTypeRequire.requireHash());
+        
Assertions.assertEquals(LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE, 
satisfiedUpgrade.second);
+        Assertions.assertSame(probeLocal, satisfiedJoin.getChild(0));
+        Assertions.assertSame(buildLocal, satisfiedJoin.getChild(1));
+
+        // 3. Parent requires bucket distribution (upper bucket join) → no 
upgrade even when
+        //    the fragment is eligible: children keep BUCKET_HASH_SHUFFLE.
+        PlanTranslatorContext parentBucketCtx = new PlanTranslatorContext();
+        parentBucketCtx.setCurrentFragmentBucketUpgradeEligible(true);
+        TrackingPlanNode probeForBucketParent = new 
TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.NOOP);
+        TrackingPlanNode buildForBucketParent = new 
TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.NOOP);
+        HashJoinNode bucketParentJoin = new HashJoinNode(nextPlanNodeId(), 
probeForBucketParent,
+                buildForBucketParent, JoinOperator.INNER_JOIN, eqConjuncts, 
Collections.emptyList(),
+                null, null, false);
+        
bucketParentJoin.setChildrenDistributeExprLists(mockDistributeExprLists());
+        bucketParentJoin.setDistributionMode(DistributionMode.BUCKET_SHUFFLE);
+        Pair<PlanNode, LocalExchangeType> bucketParentOutput = 
bucketParentJoin.enforceAndDeriveLocalExchange(
+                parentBucketCtx, null, 
LocalExchangeTypeRequire.requireBucketHash());
+        Assertions.assertEquals(LocalExchangeType.BUCKET_HASH_SHUFFLE, 
bucketParentOutput.second);
+        assertChildLocalExchangeType(bucketParentJoin, 0, 
LocalExchangeType.BUCKET_HASH_SHUFFLE);
+        assertChildLocalExchangeType(bucketParentJoin, 1, 
LocalExchangeType.BUCKET_HASH_SHUFFLE);
+
+        // 4. Fragment not eligible (ratio gate failed / not a pooled bucket 
fragment) →
+        //    existing behavior untouched.
+        PlanTranslatorContext ineligibleCtx = new PlanTranslatorContext();
+        TrackingPlanNode probeIneligible = new 
TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.NOOP);
+        TrackingPlanNode buildIneligible = new 
TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.NOOP);
+        HashJoinNode ineligibleJoin = new HashJoinNode(nextPlanNodeId(), 
probeIneligible, buildIneligible,
+                JoinOperator.INNER_JOIN, eqConjuncts, Collections.emptyList(), 
null, null, false);
+        
ineligibleJoin.setChildrenDistributeExprLists(mockDistributeExprLists());
+        ineligibleJoin.setDistributionMode(DistributionMode.BUCKET_SHUFFLE);
+        Pair<PlanNode, LocalExchangeType> ineligibleOutput = 
ineligibleJoin.enforceAndDeriveLocalExchange(
+                ineligibleCtx, null, LocalExchangeTypeRequire.requireHash());
+        Assertions.assertEquals(LocalExchangeType.BUCKET_HASH_SHUFFLE, 
ineligibleOutput.second);
+        assertChildLocalExchangeType(ineligibleJoin, 0, 
LocalExchangeType.BUCKET_HASH_SHUFFLE);
+        assertChildLocalExchangeType(ineligibleJoin, 1, 
LocalExchangeType.BUCKET_HASH_SHUFFLE);
+
+        // 5. Stacked bucket joins: the whole chain upgrades. The inner join 
(direct probe
+        //    child of the upgraded one) also upgrades its children to LOCAL 
hash, but
+        //    reports NOOP output so the outer join always inserts its own 
re-align LE
+        //    (keys may differ between levels).
+        PlanTranslatorContext stackedCtx = new PlanTranslatorContext();
+        stackedCtx.setCurrentFragmentBucketUpgradeEligible(true);
+        TrackingPlanNode innerProbe = new TrackingPlanNode(nextPlanNodeId(), 
LocalExchangeType.NOOP);
+        TrackingPlanNode innerBuild = new TrackingPlanNode(nextPlanNodeId(), 
LocalExchangeType.NOOP);
+        HashJoinNode innerJoin = new HashJoinNode(nextPlanNodeId(), 
innerProbe, innerBuild,
+                JoinOperator.INNER_JOIN, eqConjuncts, Collections.emptyList(), 
null, null, false);
+        innerJoin.setChildrenDistributeExprLists(mockDistributeExprLists());
+        innerJoin.setDistributionMode(DistributionMode.BUCKET_SHUFFLE);
+        TrackingPlanNode outerBuild = new TrackingPlanNode(nextPlanNodeId(), 
LocalExchangeType.NOOP);
+        HashJoinNode outerJoin = new HashJoinNode(nextPlanNodeId(), innerJoin, 
outerBuild,
+                JoinOperator.INNER_JOIN, eqConjuncts, Collections.emptyList(), 
null, null, false);
+        outerJoin.setChildrenDistributeExprLists(mockDistributeExprLists());
+        outerJoin.setDistributionMode(DistributionMode.BUCKET_SHUFFLE);
+        Pair<PlanNode, LocalExchangeType> stackedOutput = 
outerJoin.enforceAndDeriveLocalExchange(
+                stackedCtx, null, LocalExchangeTypeRequire.requireHash());
+        
Assertions.assertEquals(LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE, 
stackedOutput.second);
+        // outer upgraded: probe side wrapped with LOCAL hash LE (re-aligning 
inner's output)
+        assertChildLocalExchangeType(outerJoin, 0, 
LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+        assertChildLocalExchangeType(outerJoin, 1, 
LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+        // inner upgraded too (whole-chain): its children get LOCAL hash LEs
+        assertChildLocalExchangeType(innerJoin, 0, 
LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+        assertChildLocalExchangeType(innerJoin, 1, 
LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+
+        // 6. Colocate join takes the same upgrade path.
+        PlanTranslatorContext colocateCtx = new PlanTranslatorContext();
+        colocateCtx.setCurrentFragmentBucketUpgradeEligible(true);
+        TrackingPlanNode colocateProbe = new 
TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.NOOP);
+        TrackingPlanNode colocateBuild = new 
TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.NOOP);
+        HashJoinNode colocateJoin = new HashJoinNode(nextPlanNodeId(), 
colocateProbe, colocateBuild,
+                JoinOperator.INNER_JOIN, eqConjuncts, Collections.emptyList(), 
null, null, false);
+        colocateJoin.setChildrenDistributeExprLists(mockDistributeExprLists());
+        colocateJoin.setColocate(true, "test");
+        Pair<PlanNode, LocalExchangeType> colocateOutput = 
colocateJoin.enforceAndDeriveLocalExchange(
+                colocateCtx, null, LocalExchangeTypeRequire.requireHash());
+        
Assertions.assertEquals(LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE, 
colocateOutput.second);
+        assertChildLocalExchangeType(colocateJoin, 0, 
LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+        assertChildLocalExchangeType(colocateJoin, 1, 
LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+
+        // 7. Missing distribute exprs → no upgrade (the LOCAL hash LE would 
have no keys).
+        PlanTranslatorContext noExprCtx = new PlanTranslatorContext();
+        noExprCtx.setCurrentFragmentBucketUpgradeEligible(true);
+        TrackingPlanNode probeNoExpr = new TrackingPlanNode(nextPlanNodeId(), 
LocalExchangeType.NOOP);
+        TrackingPlanNode buildNoExpr = new TrackingPlanNode(nextPlanNodeId(), 
LocalExchangeType.NOOP);
+        HashJoinNode noExprJoin = new HashJoinNode(nextPlanNodeId(), 
probeNoExpr, buildNoExpr,
+                JoinOperator.INNER_JOIN, eqConjuncts, Collections.emptyList(), 
null, null, false);
+        noExprJoin.setDistributionMode(DistributionMode.BUCKET_SHUFFLE);
+        Pair<PlanNode, LocalExchangeType> noExprOutput = 
noExprJoin.enforceAndDeriveLocalExchange(
+                noExprCtx, null, LocalExchangeTypeRequire.requireHash());
+        Assertions.assertEquals(LocalExchangeType.BUCKET_HASH_SHUFFLE, 
noExprOutput.second);
+        assertChildLocalExchangeType(noExprJoin, 0, 
LocalExchangeType.BUCKET_HASH_SHUFFLE);
+        assertChildLocalExchangeType(noExprJoin, 1, 
LocalExchangeType.BUCKET_HASH_SHUFFLE);
+    }
+
+
+    @Test
+    public void testShouldUpgradeBucketParallelismGate() {
+        // ratio <= 1 (including 0 and negatives) always disables — the knob 
doubles as the
+        // off switch: requiring at most 1x parallelism gain means no gain.
+        
Assertions.assertFalse(AddLocalExchange.shouldUpgradeBucketParallelism(0, 16, 
8));
+        
Assertions.assertFalse(AddLocalExchange.shouldUpgradeBucketParallelism(-1, 16, 
8));
+        
Assertions.assertFalse(AddLocalExchange.shouldUpgradeBucketParallelism(1.0, 16, 
8));
+        // active threshold: instances must exceed buckets-with-data × ratio
+        
Assertions.assertTrue(AddLocalExchange.shouldUpgradeBucketParallelism(1.5, 16, 
8));
+        
Assertions.assertFalse(AddLocalExchange.shouldUpgradeBucketParallelism(1.5, 12, 
8));
+        
Assertions.assertFalse(AddLocalExchange.shouldUpgradeBucketParallelism(2.0, 16, 
8));
+        
Assertions.assertTrue(AddLocalExchange.shouldUpgradeBucketParallelism(1.5, 256, 
8));
+        // no buckets with data → nothing to upgrade
+        
Assertions.assertFalse(AddLocalExchange.shouldUpgradeBucketParallelism(1.5, 16, 
0));
+    }
+
     @Test
     public void testLocalExchangeNodeIsNotSerializedAsSerialOperator() {
         SerialTrackingScanNode serialScan = new 
SerialTrackingScanNode(nextPlanNodeId(), LocalExchangeType.NOOP);
@@ -441,7 +593,7 @@ public class LocalShuffleNodeCoverageTest {
         intersectNode.addChild(right);
         Pair<PlanNode, LocalExchangeType> intersectOutput = 
intersectNode.enforceAndDeriveLocalExchange(
                 ctx, null, LocalExchangeTypeRequire.requireHash());
-        // PARTITIONED intersect requires GLOBAL hash (DORIS-26100)
+        // PARTITIONED intersect requires GLOBAL hash
         
Assertions.assertEquals(LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE, 
intersectOutput.second);
         assertChildLocalExchangeType(intersectNode, 0, 
LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE);
         assertChildLocalExchangeType(intersectNode, 1, 
LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE);
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 4f8826bdb3f..fb8ef30150e 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -1609,6 +1609,12 @@ struct TRuntimeFilterDesc {
   // the listed partitions with the listed direction; absent partitions are
   // unsafe for this RF target and must not be pruned by it.
   20: optional map<Types.TPlanNodeId, list<TPartitionTargetExprMonotonicity>> 
planId_to_partition_target_monotonicity;
+
+  // True when a local exchange sits between the filter builder (join) and a 
same-fragment
+  // target scan: per-instance partial filters are then NOT aligned with the 
scan's data
+  // slice and must be merged before being applied. Computed truthfully by FE 
after local
+  // exchange planning; replaces inferring this from the target scan's 
is_serial_operator.
+  21: optional bool force_local_merge;
 }
 
 
diff --git 
a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_bucket_upgrade.groovy
 
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_bucket_upgrade.groovy
new file mode 100644
index 00000000000..8d493c54e87
--- /dev/null
+++ 
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_bucket_upgrade.groovy
@@ -0,0 +1,193 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/**
+ * Bucket -> local-hash parallelism upgrade.
+ *
+ * A pooled bucket-join fragment runs its bucket joins at bucket-count 
parallelism
+ * (only instances owning buckets do join work). When nothing above the join 
needs
+ * bucket alignment and per-BE instances > buckets-with-data x ratio
+ * (session var local_shuffle_bucket_upgrade_ratio, > 1 enables, <= 1 
disables),
+ * the FE planner re-distributes both join sides with 
LOCAL_EXECUTION_HASH_SHUFFLE
+ * so the join uses all instances.
+ *
+ * Shape notes (verified against a live cluster):
+ *  - LocalExchangeNodes only appear in EXPLAIN DISTRIBUTED PLAN (plain EXPLAIN
+ *    renders the tree before AddLocalExchange runs).
+ *  - Whether a bucket-shuffle join forms is cluster-dependent (Nereids 
downgrades it
+ *    when totalBucketNum < totalInstanceNum * 
bucket_shuffle_downgrade_ratio). The suite
+ *    pins bucket_shuffle_downgrade_ratio=0 to keep it forming, but the 
plan-shape checks
+ *    still gate on "did a BUCKET_HASH_SHUFFLE local exchange actually 
appear?" and skip
+ *    if not, so the test never hard-fails on an environment where it didn't 
form.
+ *  - The upgrade fires when min(task_num=16, cores) / min(buckets=4, cores) > 
1.1, i.e.
+ *    on any machine with >= 5 cores (5/4 = 1.25 > 1.1); CI and dev machines 
comfortably
+ *    exceed this. Bucket counts are kept low (4/3/3) so a modest core count 
is enough.
+ *  - The aggregation above must NOT group by the bucket key: a colocate agg
+ *    requires bucket distribution of the join output and correctly blocks the
+ *    upgrade via the parentRequire gate.
+ */
+suite("test_local_shuffle_bucket_upgrade") {
+
+    def hints = { ls_on, ratio ->
+        """/*+SET_VAR(
+            enable_sql_cache=false, disable_join_reorder=true,
+            disable_colocate_plan=true,
+            auto_broadcast_join_threshold=-1, broadcast_row_count_limit=0,
+            experimental_force_to_local_shuffle=true,
+            experimental_enable_parallel_scan=false,
+            enable_runtime_filter_prune=false,
+            enable_runtime_filter_partition_prune=false,
+            runtime_filter_type='IN,MIN_MAX',
+            parallel_pipeline_task_num=16,
+            parallel_exchange_instance_num=8,
+            query_timeout=600,
+            bucket_shuffle_downgrade_ratio=0,
+            local_shuffle_bucket_upgrade_ratio=${ratio},
+            enable_local_shuffle=${ls_on},
+            enable_local_shuffle_planner=${ls_on}
+        )*/"""
+    }
+
+    sql "DROP TABLE IF EXISTS lsbu_fact"
+    sql "DROP TABLE IF EXISTS lsbu_probe"
+    sql "DROP TABLE IF EXISTS lsbu_probe2"
+    sql """CREATE TABLE lsbu_fact (k INT, v BIGINT)
+           ENGINE=OLAP DUPLICATE KEY(k) DISTRIBUTED BY HASH(k) BUCKETS 4
+           PROPERTIES ("replication_num"="1")"""
+    sql """CREATE TABLE lsbu_probe (pk INT, k INT, w BIGINT)
+           ENGINE=OLAP DUPLICATE KEY(pk) DISTRIBUTED BY HASH(pk) BUCKETS 3
+           PROPERTIES ("replication_num"="1")"""
+    sql """CREATE TABLE lsbu_probe2 (pk INT, k INT, w BIGINT)
+           ENGINE=OLAP DUPLICATE KEY(pk) DISTRIBUTED BY HASH(pk) BUCKETS 3
+           PROPERTIES ("replication_num"="1")"""
+    sql """INSERT INTO lsbu_fact
+           SELECT CAST(number%50 AS INT), number*10+1
+           FROM numbers("number"="200")"""
+    sql """INSERT INTO lsbu_probe
+           SELECT CAST(number AS INT), CAST(number%50 AS INT), 1000+number
+           FROM numbers("number"="300")"""
+    sql """INSERT INTO lsbu_probe2
+           SELECT CAST(number AS INT), CAST(number%50 AS INT), 2000+number
+           FROM numbers("number"="170")"""
+
+    // group key pk%10 is NOT the bucket key, so the agg above does not require
+    // bucket distribution and the upgrade is allowed.
+    def singleJoin = { h ->
+        """SELECT ${h} p.pk % 10 AS g, COUNT(*) c, SUM(f.v) sv, SUM(p.w) sw
+           FROM lsbu_fact f JOIN lsbu_probe p ON p.k = f.k
+           GROUP BY g ORDER BY g"""
+    }
+
+    // ---------- 1. plan shape (EXPLAIN DISTRIBUTED PLAN: 
post-AddLocalExchange) ----------
+    // The upgrade replaces the bucket join's BUCKET_HASH_SHUFFLE local 
exchange with
+    // LOCAL_EXECUTION_HASH_SHUFFLE. "BUCKET_HASH_SHUFFLE" names ONLY a 
local-exchange type
+    // (the join op prints "BUCKET_SHUFFLE", the network sink 
"BUCKET_SHFFULE_HASH_PARTITIONED"),
+    // so it is an unambiguous, fragment-local signal of the thing being 
upgraded — unlike
+    // LOCAL_EXECUTION_HASH, which an agg-finalize fragment may also carry on 
a multi-BE
+    // cluster regardless of the gate.
+    //
+    // First confirm a bucket-shuffle local exchange actually formed; if it 
did not (cluster
+    // shaped the join differently), there is nothing to upgrade, so skip 
rather than fail.
+    def countBucketHashLe = { String planText -> 
planText.split("BUCKET_HASH_SHUFFLE").length - 1 }
+
+    def bucketText = (sql "EXPLAIN DISTRIBUTED PLAN ${singleJoin(hints('true', 
'0'))}").toString()
+    int bucketLeCount = countBucketHashLe(bucketText)
+    if (bucketLeCount == 0) {
+        logger.warn("bucket-shuffle join did not form in this environment; "
+            + "skipping single-join upgrade plan-shape checks")
+    } else {
+        // ratio=1.1 upgrades the bucket join → all BUCKET_HASH local 
exchanges are gone
+        def upgradedText = (sql "EXPLAIN DISTRIBUTED PLAN 
${singleJoin(hints('true', '1.1'))}").toString()
+        assertEquals(0, countBucketHashLe(upgradedText),
+            "ratio=1.1 must upgrade away the bucket join's BUCKET_HASH_SHUFFLE 
local exchanges")
+
+        // ratio <= 1 disables the upgrade → bucket-hash local exchanges 
unchanged
+        def ratioOneText = (sql "EXPLAIN DISTRIBUTED PLAN 
${singleJoin(hints('true', '1'))}").toString()
+        assertEquals(bucketLeCount, countBucketHashLe(ratioOneText),
+            "ratio=1 must keep the upgrade off (<=1 disables)")
+    }
+
+    // Note: whether a group-by-bucket-key agg blocks the upgrade depends on 
the agg
+    // shape the optimizer picks (a colocate one-phase agg requires bucket 
distribution
+    // and blocks it; a two-phase agg does not). That parentRequire gate is 
covered
+    // deterministically by LocalShuffleNodeCoverageTest; here we only pin 
correctness.
+    def bucketKeyAgg = { h ->
+        """SELECT ${h} f.k AS g, COUNT(*) c, SUM(p.w) sw
+           FROM lsbu_fact f JOIN lsbu_probe p ON p.k = f.k
+           GROUP BY g ORDER BY g"""
+    }
+    def bka_baseline = sql bucketKeyAgg(hints('false', '0'))
+    def bka_upgraded = sql bucketKeyAgg(hints('true', '1.1'))
+    assertEquals(50, bka_baseline.size())
+    assertEquals(bka_baseline, bka_upgraded,
+        "group-by-bucket-key agg over (possibly upgraded) bucket join must 
stay correct")
+
+    // ---------- 2. correctness: single bucket join ----------
+    def single_baseline = sql singleJoin(hints('false', '0'))
+    def single_bucket = sql singleJoin(hints('true', '0'))
+    def single_upgraded = sql singleJoin(hints('true', '1.1'))
+
+    assertEquals(10, single_baseline.size())
+    assertEquals(single_baseline, single_bucket,
+        "bucket join (upgrade off) must match local-shuffle-off baseline")
+    assertEquals(single_baseline, single_upgraded,
+        "upgraded bucket join must match local-shuffle-off baseline")
+
+    // ---------- 3. correctness: stacked bucket joins ----------
+    def stackedJoin = { h ->
+        """SELECT ${h} p1.pk % 10 AS g, COUNT(*) c, SUM(f.v) sv, SUM(p1.w) s1, 
SUM(p2.w) s2
+           FROM lsbu_fact f
+           JOIN lsbu_probe p1 ON p1.k = f.k
+           JOIN lsbu_probe2 p2 ON p2.k = f.k
+           GROUP BY g ORDER BY g"""
+    }
+
+    // whole-chain shape: at an eligible ratio every level of the stacked 
bucket chain
+    // upgrades (the lower join reports NOOP so the upper re-align LE is 
kept), so all
+    // BUCKET_HASH local exchanges are upgraded away. Skip if the chain didn't 
form here.
+    def stackedBucketText = (sql "EXPLAIN DISTRIBUTED PLAN 
${stackedJoin(hints('true', '0'))}").toString()
+    if (countBucketHashLe(stackedBucketText) == 0) {
+        logger.warn("stacked bucket-shuffle chain did not form in this 
environment; "
+            + "skipping stacked upgrade plan-shape check")
+    } else {
+        def stackedUpgradedText = (sql "EXPLAIN DISTRIBUTED PLAN 
${stackedJoin(hints('true', '1.1'))}").toString()
+        assertEquals(0, countBucketHashLe(stackedUpgradedText),
+            "ratio=1.1 must upgrade away the stacked bucket chain's 
BUCKET_HASH local exchanges")
+    }
+
+    // Forced-RF killer case: with the upgrade, the join build is hash-sliced; 
the
+    // per-instance IN/MIN_MAX partial filters MUST be merged before 
application
+    // (TRuntimeFilterDesc.force_local_merge). Before that fix this query 
silently
+    // lost up to 96% of its rows.
+    def rfHints = { ratio ->
+        hints('true', ratio).replace(")*/",
+            ", enable_runtime_filter_prune=false, 
runtime_filter_type='IN,MIN_MAX')*/")
+    }
+    def single_up_rf = sql "SELECT ${rfHints('1.1')} p.pk % 10 AS g, COUNT(*) 
c, SUM(f.v) sv, SUM(p.w) sw FROM lsbu_fact f JOIN lsbu_probe p ON p.k = f.k 
GROUP BY g ORDER BY g"
+    assertEquals(single_baseline, single_up_rf,
+        "upgraded bucket join with forced IN/MIN_MAX runtime filters must stay 
correct")
+
+    def stacked_baseline = sql stackedJoin(hints('false', '0'))
+    def stacked_bucket = sql stackedJoin(hints('true', '0'))
+    def stacked_upgraded = sql stackedJoin(hints('true', '1.1'))
+
+    assertEquals(10, stacked_baseline.size())
+    assertEquals(stacked_baseline, stacked_bucket,
+        "stacked bucket joins (upgrade off) must match local-shuffle-off 
baseline")
+    assertEquals(stacked_baseline, stacked_upgraded,
+        "stacked bucket joins (upgrade on) must match local-shuffle-off 
baseline")
+}
diff --git 
a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy
 
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy
index e369dc0f11e..ca3fe027c47 100644
--- 
a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy
+++ 
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy
@@ -19,7 +19,7 @@
  * Regression tests for bugs discovered by RQG testing on the local-exchange2 
branch.
  *
  * These queries triggered "must set shared state" errors or incorrect results
- * in RQG build 183992.  Common conditions:
+ * in RQG testing.  Common conditions:
  *   - use_serial_exchange=true  (makes ALL Exchanges serial, not just 
UNPARTITIONED)
  *   - enable_local_shuffle_planner=true (FE-planned local exchange)
  *   - parallel_pipeline_task_num > 1
@@ -65,7 +65,7 @@ suite("test_local_shuffle_rqg_bugs") {
         PROPERTIES ("replication_num" = "1")
     """
 
-    // Table for build 184181 GLOBAL_HASH_SHUFFLE bugs — needs varchar + 
bigint columns
+    // Table for RQG testing GLOBAL_HASH_SHUFFLE bugs — needs varchar + bigint 
columns
     sql """
         CREATE TABLE rqg_t3 (
             pk INT NOT NULL,
@@ -354,7 +354,7 @@ suite("test_local_shuffle_rqg_bugs") {
     //  local exchange on outer NLJ's build side because child was NLJ (not 
ScanNode).
     //  Fixed in NestedLoopJoinNode.enforceAndDeriveLocalExchange by using
     //  fragment.useSerialSource() instead of instanceof ScanNode check.
-    //  This was the root cause of 989 RQG test failures (build 183677).
+    //  This was the root cause of 989 RQG test failures (RQG testing).
     // ============================================================
 
     logger.info("=== Bug 6: CROSS_JOIN shared state - nested NLJ + pooling 
scan (FE planner) ===")
@@ -513,7 +513,7 @@ suite("test_local_shuffle_rqg_bugs") {
 
     // ============================================================
     //  Bug 10: GLOBAL_HASH_SHUFFLE Rows mismatched — self-join + NLJ
-    //  RQG case: 906784672 (build 184181)
+    //  RQG regression case
     //  Root cause: HashJoinNode used requireGlobalExecutionHash() → GLOBAL 
local exchange
     //  inserted when use_serial_exchange=true; shuffle_idx_to_instance_idx 
map has only
     //  4 entries (1/BE) but GLOBAL hash needs N*dop entries → most rows 
unrouted (0 actual rows).
@@ -522,7 +522,7 @@ suite("test_local_shuffle_rqg_bugs") {
     //       then NLJ (LEFT JOIN table1 table3 ON pk > col_bigint_undef_signed)
     // ============================================================
 
-    logger.info("=== Bug 10: GLOBAL_HASH_SHUFFLE Rows mismatched - self-join + 
NLJ (build 184181 case 906784672) ===")
+    logger.info("=== Bug 10: GLOBAL_HASH_SHUFFLE Rows mismatched - self-join + 
NLJ (RQG testing case 906784672) ===")
     def bug10_fe = sql """
         SELECT /*+SET_VAR(use_serial_exchange=true, 
parallel_pipeline_task_num=4,
                           enable_local_shuffle_planner=true,
@@ -564,12 +564,12 @@ suite("test_local_shuffle_rqg_bugs") {
 
     // ============================================================
     //  Bug 11: GLOBAL_HASH_SHUFFLE Rows mismatched — FULL OUTER JOIN + GROUP 
BY
-    //  RQG case: 11007681241 (build 184181)
+    //  RQG regression case
     //  Same root cause as Bug 10.
     //  SQL: FULL OUTER JOIN on col_bigint_undef_signed_not_null with WHERE + 
GROUP BY
     // ============================================================
 
-    logger.info("=== Bug 11: GLOBAL_HASH_SHUFFLE Rows mismatched - FULL OUTER 
JOIN + GROUP BY (build 184181 case 11007681241) ===")
+    logger.info("=== Bug 11: GLOBAL_HASH_SHUFFLE Rows mismatched - FULL OUTER 
JOIN + GROUP BY (RQG testing case 11007681241) ===")
     def bug11_fe = sql """
         SELECT /*+SET_VAR(use_serial_exchange=true, 
parallel_pipeline_task_num=4,
                           enable_local_shuffle_planner=true,
@@ -603,12 +603,12 @@ suite("test_local_shuffle_rqg_bugs") {
 
     // ============================================================
     //  Bug 12: GLOBAL_HASH_SHUFFLE Rows mismatched — LEFT JOIN + VARCHAR 
predicates + MIN()
-    //  RQG case: 906784662 (build 184181)
+    //  RQG regression case
     //  Same root cause as Bug 10/11.
     //  SQL: LEFT JOIN on pk with VARCHAR NOT IN / BETWEEN / IN predicates, 
MIN() aggregate
     // ============================================================
 
-    logger.info("=== Bug 12: GLOBAL_HASH_SHUFFLE Rows mismatched - LEFT JOIN + 
VARCHAR predicates (build 184181 case 906784662) ===")
+    logger.info("=== Bug 12: GLOBAL_HASH_SHUFFLE Rows mismatched - LEFT JOIN + 
VARCHAR predicates (RQG testing case 906784662) ===")
     def bug12_fe = sql """
         SELECT /*+SET_VAR(use_serial_exchange=true, 
parallel_pipeline_task_num=4,
                           enable_local_shuffle_planner=true,
@@ -646,7 +646,7 @@ suite("test_local_shuffle_rqg_bugs") {
 
     // ============================================================
     //  Bug 13: NLJ COREDUMP — serial NLJ + pooling scan + BROADCAST build side
-    //  RQG build 184430, query c0dafc1bed0f4910
+    //  RQG testing
     //  Root cause: serial NLJ (RIGHT_OUTER) with pooling scan inserted 
BROADCAST
     //  local exchange on build side, inflating build pipeline num_tasks to 
_num_instances
     //  while probe pipeline stayed at 1 task. Instance 1+ created build tasks 
without
@@ -696,7 +696,7 @@ suite("test_local_shuffle_rqg_bugs") {
 
     // ============================================================
     //  Bug 14: BUCKET_SHUFFLE join + serial build Exchange — must set shared 
state
-    //  RQG build 184563, cases 906784706/906784783/906784987/906785006
+    //  RQG testing
     //  Root cause: BUCKET_SHUFFLE join build side ExchangeNode marked serial 
in
     //  pooling scan fragment → build pipeline num_tasks reduced to 1 →
     //  instance 1+ have probe tasks without build tasks → shared state 
injection
@@ -925,7 +925,7 @@ suite("test_local_shuffle_rqg_bugs") {
     //
     //  Both triggered by: OVER() with no PARTITION BY + GROUPING SETS +
     //  pptn=0 (auto-parallel) + disable_streaming_preaggregations=true
-    //  RQG build 186195, query IDs: 7f3178a77c2c4b6b, 71887f7bf804c0c, 
5dd9fcad234c4484
+    //  RQG testing
     // ============================================================
     sql "DROP TABLE IF EXISTS rqg_analytic_t1"
     sql """
@@ -1160,9 +1160,58 @@ suite("test_local_shuffle_rqg_bugs") {
         assertTrue(false, "Bug 20: Serial exchange + agg hang: ${t.message}")
     }
 
+    // ============================================================
+    //  Bug 20b: count(distinct)+std + RIGHT JOIN returns inflated distinct 
count
+    //  when use_serial_exchange=true + enable_local_exchange_before_agg=false.
+    //  Root cause (BE-planned): AggSink early-return ignored that the serial 
exchange
+    //  child breaks the HASH(s) invariant via PASSTHROUGH fan-out; fixed 
upstream by
+    //  child_breaks_local_key_distribution (#63766). The FE planner fixes it
+    //  structurally: requires are semantic, a hash LE is inserted instead of
+    //  PASSTHROUGH. This case pins both paths.
+    // ============================================================
+    try {
+        logger.info("Bug 20b: count(distinct) under serial exchange")
+        sql "DROP TABLE IF EXISTS rqg_25413_t1"
+        sql "DROP TABLE IF EXISTS rqg_25413_t2"
+        sql """CREATE TABLE rqg_25413_t1 (pk INT NOT NULL, s VARCHAR(64) NOT 
NULL, d DECIMAL(10,2) NOT NULL)
+               ENGINE=OLAP DUPLICATE KEY(pk) DISTRIBUTED BY HASH(pk) BUCKETS 5
+               PROPERTIES ("replication_num"="1")"""
+        sql """CREATE TABLE rqg_25413_t2 (pk INT NOT NULL, dt DATETIME NOT 
NULL)
+               ENGINE=OLAP DUPLICATE KEY(pk) DISTRIBUTED BY HASH(pk) BUCKETS 5
+               PROPERTIES ("replication_num"="1")"""
+        sql """INSERT INTO rqg_25413_t1
+               SELECT CAST(number AS INT), concat('s', CAST(number % 29 AS 
INT)),
+                      CAST(number * 13 % 1000 AS DECIMAL(10,2))
+               FROM numbers("number"="200")"""
+        sql """INSERT INTO rqg_25413_t2
+               SELECT CAST(number AS INT),
+                      date_add('2000-01-01 00:00:00', INTERVAL CAST(number % 
3000 AS INT) DAY)
+               FROM numbers("number"="200")"""
+
+        def q25413 = { vars -> """
+            SELECT /*+SET_VAR(${vars})*/
+                count(distinct t1.s) AS cnt_distinct, std(t1.d) AS std_val
+            FROM rqg_25413_t1 t1
+            RIGHT JOIN rqg_25413_t2 t2 ON t1.pk = t2.pk
+            WHERE t2.dt < '2005-01-01 00:00:00'
+        """ }
+        def base25413 = "enable_sql_cache=false, 
enable_local_exchange_before_agg=false, parallel_pipeline_task_num=4"
+        def expected25413 = sql q25413(base25413)
+        for (planner in ['false', 'true']) {
+            def actual = sql q25413(
+                "${base25413}, experimental_use_serial_exchange=true, 
enable_local_shuffle_planner=${planner}")
+            assertEquals(expected25413, actual,
+                "Bug 20b planner=${planner}: distinct count must not be 
inflated under serial exchange")
+        }
+        logger.info("Bug 20b: PASSED")
+    } catch (Throwable t) {
+        logger.error("Bug 20b FAILED: ${t.message}")
+        assertTrue(false, "Bug 20b: ${t.message}")
+    }
+
     // ============================================================
     //  Bug 21: Multi-distinct COUNT on many-bucket table → COREDUMP
-    //  RQG build 186737/186929/186952: AggSinkOperatorX::sink → 
set_ready_to_read
+    //  AggSinkOperatorX::sink → set_ready_to_read
     //  with empty source_deps.
     //
     //  Root cause: AGG operators (streaming, distinct-streaming, serialize) 
requested


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to