This is an automated email from the ASF dual-hosted git repository. 924060929 pushed a commit to branch fe_local_shuffle_rebase_wip in repository https://gitbox.apache.org/repos/asf/doris.git
commit c51498efb58328147445c871700a18edbca503c4 Author: 924060929 <[email protected]> AuthorDate: Tue Jun 2 11:19:30 2026 +0800 [fix](local shuffle) fall back to LOCAL hash for serial source in PARTITIONED join/intersect DORIS-26120: with use_serial_exchange=true, shuffle_idx_to_instance_idx has only one entry. GLOBAL_EXECUTION_HASH_SHUFFLE routes data to non-existent indices, causing "Rows mismatched" error. Fix: check fragment.useSerialSource() in HashJoinNode and SetOperationNode. When true, fall back to requireHash() (LOCAL), matching BE's _use_serial_source behavior. When false (normal case), keep requireGlobalExecutionHash() for DORIS-26101 correctness. --- .../org/apache/doris/planner/HashJoinNode.java | 13 +++++-- .../org/apache/doris/planner/SetOperationNode.java | 10 ++++-- .../planner/LocalShuffleNodeCoverageTest.java | 15 ++++++++ .../test_local_shuffle_global_hash_require.groovy | 42 ++++++++++++++++++++++ 4 files changed, 76 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java index a737e6106b5..cbf2dcd6511 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java @@ -349,8 +349,17 @@ public class HashJoinNode extends JoinNodeBase { // instance mapping as the cross-fragment exchange. LOCAL hash has a different // modulus (per-BE instance count vs total instance count) and would cause // join mismatches (DORIS-26101). - buildSideRequire = probeSideRequire - = LocalExchangeTypeRequire.requireGlobalExecutionHash(); + // + // Exception: serial source (use_serial_exchange=true + pooling). The serial + // exchange sends to a single BE so shuffle_idx_to_instance_idx has only one + // entry — GLOBAL hash would route data to non-existent indices (DORIS-26120). + // Fall back to generic requireHash() which resolves to LOCAL, matching BE's + // _use_serial_source behavior. + boolean serialSource = fragment != null + && fragment.useSerialSource(translatorContext.getConnectContext()); + buildSideRequire = probeSideRequire = serialSource + ? LocalExchangeTypeRequire.requireHash() + : LocalExchangeTypeRequire.requireGlobalExecutionHash(); outputType = null; // derived from probeResult.second below } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java index af9338fa74f..17fdf35bbd0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java @@ -231,8 +231,14 @@ public abstract class SetOperationNode extends PlanNode { // PARTITIONED intersect/except: all children enter via global hash // exchange. Require GLOBAL so any inserted exchange matches the // cross-fragment instance mapping (same fix as HashJoinNode DORIS-26101). - requireChild = LocalExchangeTypeRequire.requireGlobalExecutionHash(); - outputType = LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE; + // Exception: serial source → fall back to LOCAL (DORIS-26120). + boolean serialSource = fragment != null + && fragment.useSerialSource(translatorContext.getConnectContext()); + requireChild = serialSource + ? LocalExchangeTypeRequire.requireHash() + : LocalExchangeTypeRequire.requireGlobalExecutionHash(); + outputType = AddLocalExchange.resolveExchangeType( + requireChild, translatorContext, this, firstChild); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java index 188941bdb3a..7288ba49ca2 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java @@ -251,6 +251,21 @@ public class LocalShuffleNodeCoverageTest { "no exchange should be inserted when child already provides GLOBAL hash"); Assertions.assertSame(buildGlobal, partitionedSatisfied.getChild(1)); + // DORIS-26120: PARTITIONED join with serial source falls back to LOCAL hash + // because GLOBAL shuffle_idx_to_instance_idx is incomplete for serial exchange. + TrackingScanNode probeSerial = new TrackingScanNode(nextPlanNodeId(), LocalExchangeType.NOOP); + TrackingPlanNode buildSerial = new TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.NOOP); + HashJoinNode serialPartitioned = new HashJoinNode(nextPlanNodeId(), probeSerial, buildSerial, + JoinOperator.INNER_JOIN, eqConjuncts, Collections.emptyList(), null, null, false); + serialPartitioned.setDistributionMode(DistributionMode.PARTITIONED); + serialPartitioned.fragment = Mockito.mock(PlanFragment.class); + Mockito.when(serialPartitioned.fragment.useSerialSource(Mockito.any())).thenReturn(true); + Pair<PlanNode, LocalExchangeType> serialPartOutput = serialPartitioned.enforceAndDeriveLocalExchange( + ctx, null, LocalExchangeTypeRequire.requireHash()); + Assertions.assertEquals(LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE, serialPartOutput.second); + assertChildLocalExchangeType(serialPartitioned, 0, LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE); + assertChildLocalExchangeType(serialPartitioned, 1, LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE); + TrackingPlanNode probe3 = new TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.NOOP); TrackingPlanNode build3 = new TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.NOOP); HashJoinNode nullAwareJoin = new HashJoinNode(nextPlanNodeId(), probe3, build3, diff --git a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_global_hash_require.groovy b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_global_hash_require.groovy index 82b3a57e21f..f2aff5525b5 100644 --- a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_global_hash_require.groovy +++ b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_global_hash_require.groovy @@ -365,4 +365,46 @@ suite("test_local_shuffle_global_hash_require") { assertEquals(23, ptopn_baseline.size()) assertEquals(ptopn_baseline, ptopn_fe, "DORIS-26103: UNION ALL -> PartitionTopN -> INTERSECT") + + // ============================================================ + // DORIS-26120: serial exchange + shuffle join → GLOBAL hash + // shuffle_idx_to_instance_idx incomplete → Rows mismatched. + // Fix: fall back to LOCAL hash when fragment uses serial source. + // ============================================================ + sql "DROP TABLE IF EXISTS ls_serial_fact" + sql "DROP TABLE IF EXISTS ls_serial_dim" + sql """CREATE TABLE ls_serial_fact (pk INT NOT NULL, g INT NOT NULL) + ENGINE=OLAP DUPLICATE KEY(pk,g) DISTRIBUTED BY HASH(pk) BUCKETS 1 + PROPERTIES ("replication_num"="1")""" + sql """CREATE TABLE ls_serial_dim (g INT NOT NULL) + ENGINE=OLAP DUPLICATE KEY(g) DISTRIBUTED BY HASH(g) BUCKETS 1 + PROPERTIES ("replication_num"="1")""" + sql "INSERT INTO ls_serial_fact VALUES (1, 1)" + sql "INSERT INTO ls_serial_dim VALUES (1)" + + def serial_baseline = sql """SELECT /*+SET_VAR( + enable_sql_cache=false, + enable_local_shuffle=false, + enable_local_shuffle_planner=false, + use_serial_exchange=true, + parallel_pipeline_task_num=4, + ignore_storage_data_distribution=true + )*/ a.g AS left_g, b.g AS right_g + FROM ls_serial_fact a JOIN [shuffle] ls_serial_dim b ON a.g = b.g + ORDER BY left_g, right_g""" + + def serial_fe = sql """SELECT /*+SET_VAR( + enable_sql_cache=false, + enable_local_shuffle=true, + enable_local_shuffle_planner=true, + use_serial_exchange=true, + parallel_pipeline_task_num=4, + ignore_storage_data_distribution=true + )*/ a.g AS left_g, b.g AS right_g + FROM ls_serial_fact a JOIN [shuffle] ls_serial_dim b ON a.g = b.g + ORDER BY left_g, right_g""" + + assertEquals(1, serial_baseline.size()) + assertEquals(serial_baseline, serial_fe, + "DORIS-26120: serial exchange + shuffle join should not error") } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
