This is an automated email from the ASF dual-hosted git repository.
924060929 pushed a commit to branch fe_local_shuffle_rebase
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/fe_local_shuffle_rebase by
this push:
new f8cec96904f [fix](local shuffle) fall back to LOCAL hash for serial
source in PARTITIONED join/intersect
f8cec96904f is described below
commit f8cec96904f913e4957749df80fa96922e11e49c
Author: 924060929 <[email protected]>
AuthorDate: Tue Jun 2 11:19:30 2026 +0800
[fix](local shuffle) fall back to LOCAL hash for serial source in
PARTITIONED join/intersect
DORIS-26120: with use_serial_exchange=true, shuffle_idx_to_instance_idx
has only one entry. GLOBAL_EXECUTION_HASH_SHUFFLE routes data to
non-existent indices, causing "Rows mismatched" error.
Fix: check fragment.useSerialSource() in HashJoinNode and
SetOperationNode. When true, fall back to requireHash() (LOCAL),
matching BE's _use_serial_source behavior. When false (normal case),
keep requireGlobalExecutionHash() for DORIS-26101 correctness.
---
.../org/apache/doris/planner/HashJoinNode.java | 13 +++++--
.../org/apache/doris/planner/SetOperationNode.java | 10 ++++--
.../planner/LocalShuffleNodeCoverageTest.java | 15 ++++++++
.../test_local_shuffle_global_hash_require.groovy | 42 ++++++++++++++++++++++
4 files changed, 76 insertions(+), 4 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
index a737e6106b5..cbf2dcd6511 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
@@ -349,8 +349,17 @@ public class HashJoinNode extends JoinNodeBase {
// instance mapping as the cross-fragment exchange. LOCAL hash has
a different
// modulus (per-BE instance count vs total instance count) and
would cause
// join mismatches (DORIS-26101).
- buildSideRequire = probeSideRequire
- = LocalExchangeTypeRequire.requireGlobalExecutionHash();
+ //
+ // Exception: serial source (use_serial_exchange=true + pooling).
The serial
+ // exchange sends to a single BE so shuffle_idx_to_instance_idx
has only one
+ // entry — GLOBAL hash would route data to non-existent indices
(DORIS-26120).
+ // Fall back to generic requireHash() which resolves to LOCAL,
matching BE's
+ // _use_serial_source behavior.
+ boolean serialSource = fragment != null
+ &&
fragment.useSerialSource(translatorContext.getConnectContext());
+ buildSideRequire = probeSideRequire = serialSource
+ ? LocalExchangeTypeRequire.requireHash()
+ : LocalExchangeTypeRequire.requireGlobalExecutionHash();
outputType = null; // derived from probeResult.second below
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java
index af9338fa74f..17fdf35bbd0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java
@@ -231,8 +231,14 @@ public abstract class SetOperationNode extends PlanNode {
// PARTITIONED intersect/except: all children enter via global
hash
// exchange. Require GLOBAL so any inserted exchange matches
the
// cross-fragment instance mapping (same fix as HashJoinNode
DORIS-26101).
- requireChild =
LocalExchangeTypeRequire.requireGlobalExecutionHash();
- outputType = LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE;
+ // Exception: serial source → fall back to LOCAL (DORIS-26120).
+ boolean serialSource = fragment != null
+ &&
fragment.useSerialSource(translatorContext.getConnectContext());
+ requireChild = serialSource
+ ? LocalExchangeTypeRequire.requireHash()
+ :
LocalExchangeTypeRequire.requireGlobalExecutionHash();
+ outputType = AddLocalExchange.resolveExchangeType(
+ requireChild, translatorContext, this, firstChild);
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java
b/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java
index 188941bdb3a..7288ba49ca2 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java
@@ -251,6 +251,21 @@ public class LocalShuffleNodeCoverageTest {
"no exchange should be inserted when child already provides
GLOBAL hash");
Assertions.assertSame(buildGlobal, partitionedSatisfied.getChild(1));
+ // DORIS-26120: PARTITIONED join with serial source falls back to
LOCAL hash
+ // because GLOBAL shuffle_idx_to_instance_idx is incomplete for serial
exchange.
+ TrackingScanNode probeSerial = new TrackingScanNode(nextPlanNodeId(),
LocalExchangeType.NOOP);
+ TrackingPlanNode buildSerial = new TrackingPlanNode(nextPlanNodeId(),
LocalExchangeType.NOOP);
+ HashJoinNode serialPartitioned = new HashJoinNode(nextPlanNodeId(),
probeSerial, buildSerial,
+ JoinOperator.INNER_JOIN, eqConjuncts, Collections.emptyList(),
null, null, false);
+ serialPartitioned.setDistributionMode(DistributionMode.PARTITIONED);
+ serialPartitioned.fragment = Mockito.mock(PlanFragment.class);
+
Mockito.when(serialPartitioned.fragment.useSerialSource(Mockito.any())).thenReturn(true);
+ Pair<PlanNode, LocalExchangeType> serialPartOutput =
serialPartitioned.enforceAndDeriveLocalExchange(
+ ctx, null, LocalExchangeTypeRequire.requireHash());
+
Assertions.assertEquals(LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE,
serialPartOutput.second);
+ assertChildLocalExchangeType(serialPartitioned, 0,
LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+ assertChildLocalExchangeType(serialPartitioned, 1,
LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+
TrackingPlanNode probe3 = new TrackingPlanNode(nextPlanNodeId(),
LocalExchangeType.NOOP);
TrackingPlanNode build3 = new TrackingPlanNode(nextPlanNodeId(),
LocalExchangeType.NOOP);
HashJoinNode nullAwareJoin = new HashJoinNode(nextPlanNodeId(),
probe3, build3,
diff --git
a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_global_hash_require.groovy
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_global_hash_require.groovy
index 82b3a57e21f..f2aff5525b5 100644
---
a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_global_hash_require.groovy
+++
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_global_hash_require.groovy
@@ -365,4 +365,46 @@ suite("test_local_shuffle_global_hash_require") {
assertEquals(23, ptopn_baseline.size())
assertEquals(ptopn_baseline, ptopn_fe,
"DORIS-26103: UNION ALL -> PartitionTopN -> INTERSECT")
+
+ // ============================================================
+ // DORIS-26120: serial exchange + shuffle join → GLOBAL hash
+ // shuffle_idx_to_instance_idx incomplete → Rows mismatched.
+ // Fix: fall back to LOCAL hash when fragment uses serial source.
+ // ============================================================
+ sql "DROP TABLE IF EXISTS ls_serial_fact"
+ sql "DROP TABLE IF EXISTS ls_serial_dim"
+ sql """CREATE TABLE ls_serial_fact (pk INT NOT NULL, g INT NOT NULL)
+ ENGINE=OLAP DUPLICATE KEY(pk,g) DISTRIBUTED BY HASH(pk) BUCKETS 1
+ PROPERTIES ("replication_num"="1")"""
+ sql """CREATE TABLE ls_serial_dim (g INT NOT NULL)
+ ENGINE=OLAP DUPLICATE KEY(g) DISTRIBUTED BY HASH(g) BUCKETS 1
+ PROPERTIES ("replication_num"="1")"""
+ sql "INSERT INTO ls_serial_fact VALUES (1, 1)"
+ sql "INSERT INTO ls_serial_dim VALUES (1)"
+
+ def serial_baseline = sql """SELECT /*+SET_VAR(
+ enable_sql_cache=false,
+ enable_local_shuffle=false,
+ enable_local_shuffle_planner=false,
+ use_serial_exchange=true,
+ parallel_pipeline_task_num=4,
+ ignore_storage_data_distribution=true
+ )*/ a.g AS left_g, b.g AS right_g
+ FROM ls_serial_fact a JOIN [shuffle] ls_serial_dim b ON a.g = b.g
+ ORDER BY left_g, right_g"""
+
+ def serial_fe = sql """SELECT /*+SET_VAR(
+ enable_sql_cache=false,
+ enable_local_shuffle=true,
+ enable_local_shuffle_planner=true,
+ use_serial_exchange=true,
+ parallel_pipeline_task_num=4,
+ ignore_storage_data_distribution=true
+ )*/ a.g AS left_g, b.g AS right_g
+ FROM ls_serial_fact a JOIN [shuffle] ls_serial_dim b ON a.g = b.g
+ ORDER BY left_g, right_g"""
+
+ assertEquals(1, serial_baseline.size())
+ assertEquals(serial_baseline, serial_fe,
+ "DORIS-26120: serial exchange + shuffle join should not error")
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]