This is an automated email from the ASF dual-hosted git repository. avamingli pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/cloudberry.git
commit c2e37ee4a81d2a76d497bd4488d58ed0dbc55681 Author: Zhang Mingli <[email protected]> AuthorDate: Tue Mar 24 13:00:49 2026 +0800 Parallel Hash Full Join and Right Join PostgreSQL originally excluded FULL and RIGHT outer joins from parallel hash join because of deadlock hazards in the per-batch barrier protocol. PG 14 resolved this by introducing a dedicated PHJ_BATCH_SCAN phase: one elected worker emits unmatched inner-side rows after probing, while the others detach and move on. In CBDB, distributed execution adds a second dimension: after a full outer join the unmatched NULL-filled rows may come from any segment, so the result carries a HashedOJ locus rather than a plain Hashed locus. This change teaches the parallel planner about that: - FULL JOIN and RIGHT JOIN are now valid parallel join types in the distributed planner. Previously they were unconditionally rejected, forcing serial execution across all segments. - The HashedOJ locus produced by a parallel full join now carries parallel_workers, so operators above the join (aggregates, further joins) can remain parallel. - A crash that could occur when a parallel LASJ_NOTIN (NOT IN) join encountered NULL inner keys is fixed. The worker would exit early but the batch barrier, which was never attached to, would be touched on shutdown causing an assertion failure. Example plans (3 segments, parallel_workers=2): -- FULL JOIN: result locus is HashedOJ with Parallel Workers: 2 EXPLAIN(costs off, locus) SELECT count(*) FROM t1 FULL JOIN t2 USING (id); Finalize Aggregate Locus: Entry -> Gather Motion 6:1 (slice1; segments: 6) -> Partial Aggregate Locus: HashedOJ Parallel Workers: 2 -> Parallel Hash Full Join Locus: HashedOJ Parallel Workers: 2 Hash Cond: (t1.id = t2.id) -> Parallel Seq Scan on t1 Locus: HashedWorkers -> Parallel Hash -> Parallel Seq Scan on t2 Locus: HashedWorkers -- RIGHT JOIN: when t1 is larger the planner hashes the smaller t2 -- and probes with t1; result locus HashedWorkers EXPLAIN(costs off, locus) SELECT count(*) FROM t1 RIGHT JOIN t2 USING (id); Finalize Aggregate Locus: Entry -> Gather Motion 6:1 (slice1; segments: 6) -> Partial Aggregate Locus: HashedWorkers Parallel Workers: 2 -> Parallel Hash Right Join Locus: HashedWorkers Parallel Workers: 2 Hash Cond: (t1.id = t2.id) -> Parallel Seq Scan on t1 Locus: HashedWorkers -> Parallel Hash -> Parallel Seq Scan on t2 Locus: HashedWorkers Performance (3 segments x 2 parallel workers, 6M rows each, 50% overlap): FULL JOIN parallel: 4040 ms serial: 6347 ms speedup: 1.57x RIGHT JOIN parallel: 3039 ms serial: 5568 ms speedup: 1.83x --- src/backend/cdb/cdbpath.c | 5 +++-- src/backend/cdb/cdbpathlocus.c | 28 +++++++++++++++++++++------- src/backend/executor/nodeHash.c | 19 ++++++++++--------- src/backend/executor/nodeHashjoin.c | 6 +++--- src/include/cdb/cdbpathlocus.h | 4 ++-- 5 files changed, 39 insertions(+), 23 deletions(-) diff --git a/src/backend/cdb/cdbpath.c b/src/backend/cdb/cdbpath.c index 9e3697a3b03..e9d7dac9895 100644 --- a/src/backend/cdb/cdbpath.c +++ b/src/backend/cdb/cdbpath.c @@ -3112,8 +3112,9 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root, case JOIN_UNIQUE_INNER: case JOIN_RIGHT: case JOIN_FULL: - /* Join types are not supported in parallel yet. */ - goto fail; + outer.ok_to_replicate = false; + inner.ok_to_replicate = false; + break; case JOIN_DEDUP_SEMI: if (!enable_parallel_dedup_semi_join) goto fail; diff --git a/src/backend/cdb/cdbpathlocus.c b/src/backend/cdb/cdbpathlocus.c index 29930085429..dddae1aa64c 100644 --- a/src/backend/cdb/cdbpathlocus.c +++ b/src/backend/cdb/cdbpathlocus.c @@ -119,6 +119,11 @@ cdbpathlocus_equal(CdbPathLocus a, CdbPathLocus b) list_length(a.distkey) != list_length(b.distkey)) return false; + /* + * CBDB_PARALLEL: What if both a and b are HashedOJ with parallel workers > 0 ? + * Are they equal in practice? + */ + if ((CdbPathLocus_IsHashed(a) || CdbPathLocus_IsHashedOJ(a)) && (CdbPathLocus_IsHashed(b) || CdbPathLocus_IsHashedOJ(b))) return cdbpath_distkey_equal(a.distkey, b.distkey); @@ -544,7 +549,7 @@ cdbpathlocus_from_subquery(struct PlannerInfo *root, else { Assert(CdbPathLocus_IsHashedOJ(subpath->locus)); - CdbPathLocus_MakeHashedOJ(&locus, distkeys, numsegments); + CdbPathLocus_MakeHashedOJ(&locus, distkeys, numsegments, subpath->locus.parallel_workers); } } else @@ -711,7 +716,7 @@ cdbpathlocus_pull_above_projection(struct PlannerInfo *root, CdbPathLocus_MakeHashedWorkers(&newlocus, newdistkeys, numsegments, locus.parallel_workers); } else - CdbPathLocus_MakeHashedOJ(&newlocus, newdistkeys, numsegments); + CdbPathLocus_MakeHashedOJ(&newlocus, newdistkeys, numsegments, locus.parallel_workers); return newlocus; } else @@ -880,7 +885,7 @@ cdbpathlocus_join(JoinType jointype, CdbPathLocus a, CdbPathLocus b) newdistkeys = lappend(newdistkeys, newdistkey); } - CdbPathLocus_MakeHashedOJ(&resultlocus, newdistkeys, numsegments); + CdbPathLocus_MakeHashedOJ(&resultlocus, newdistkeys, numsegments, 0 /* Both are 0 parallel here*/); } Assert(cdbpathlocus_is_valid(resultlocus)); return resultlocus; @@ -1236,8 +1241,14 @@ cdbpathlocus_parallel_join(JoinType jointype, CdbPathLocus a, CdbPathLocus b, bo Assert(cdbpathlocus_is_valid(a)); Assert(cdbpathlocus_is_valid(b)); - /* Do both input rels have same locus? */ - if (cdbpathlocus_equal(a, b)) + /* + * Do both input rels have same locus? + * CBDB_PARALLEL: for FULL JOIN, it could be different even both + * are same loucs. Because the NULL values could be on any segments + * after join. + */ + + if (jointype != JOIN_FULL && cdbpathlocus_equal(a, b)) return a; /* @@ -1412,8 +1423,9 @@ cdbpathlocus_parallel_join(JoinType jointype, CdbPathLocus a, CdbPathLocus b, bo * If inner is hashed workers, and outer is hashed. Join locus will be hashed. * If outer is hashed workers, and inner is hashed. Join locus will be hashed workers. * Seems we should just return outer locus anyway. + * Things changed since we have parallel full join now. */ - if (parallel_aware) + if (parallel_aware && jointype != JOIN_FULL) return a; numsegments = CdbPathLocus_NumSegments(a); @@ -1469,7 +1481,9 @@ cdbpathlocus_parallel_join(JoinType jointype, CdbPathLocus a, CdbPathLocus b, bo newdistkeys = lappend(newdistkeys, newdistkey); } - CdbPathLocus_MakeHashedOJ(&resultlocus, newdistkeys, numsegments); + Assert(CdbPathLocus_NumParallelWorkers(a) == CdbPathLocus_NumParallelWorkers(b)); + + CdbPathLocus_MakeHashedOJ(&resultlocus, newdistkeys, numsegments, CdbPathLocus_NumParallelWorkers(a)); } Assert(cdbpathlocus_is_valid(resultlocus)); return resultlocus; diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 1a305de21e6..c084f7e7c78 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -2401,11 +2401,11 @@ ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate) int curbatch = hashtable->curbatch; ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared; - Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE); + Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBING); /* * It would not be deadlock-free to wait on the batch barrier, because it - * is in PHJ_BATCH_PROBE phase, and thus processes attached to it have + * is in PHJ_BATCH_PROBING phase, and thus processes attached to it have * already emitted tuples. Therefore, we'll hold a wait-free election: * only one process can continue to the next phase, and all others detach * from this batch. They can still go any work on other batches, if there @@ -3975,12 +3975,12 @@ ExecHashTableDetachBatch(HashJoinTable hashtable) sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples); sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples); - /* After attaching we always get at least to PHJ_BATCH_PROBE. */ - Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE || + /* After attaching we always get at least to PHJ_BATCH_PROBING. */ + Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBING || BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_SCAN); /* - * If we're abandoning the PHJ_BATCH_PROBE phase early without having + * If we're abandoning the PHJ_BATCH_PROBING phase early without having * reached the end of it, it means the plan doesn't want any more * tuples, and it is happy to abandon any tuples buffered in this * process's subplans. For correctness, we can't allow any process to @@ -3995,13 +3995,13 @@ ExecHashTableDetachBatch(HashJoinTable hashtable) * If phs_lasj_has_null is true, that means we have found null when building hash table, * there were no batches to detach. */ - if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE && + if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBING && !hashtable->parallel_state->phs_lasj_has_null && /* CBDB_PARALLEL */ !hashtable->batches[curbatch].outer_eof) { /* * This flag may be written to by multiple backends during - * PHJ_BATCH_PROBE phase, but will only be read in PHJ_BATCH_SCAN + * PHJ_BATCH_PROBING phase, but will only be read in PHJ_BATCH_SCAN * phase so requires no extra locking. */ batch->skip_unmatched = true; @@ -4012,10 +4012,11 @@ ExecHashTableDetachBatch(HashJoinTable hashtable) * the PHJ_BATCH_SCAN phase just to maintain the invariant that * freeing happens in PHJ_BATCH_FREE, but that'll be wait-free. */ - if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE && + if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBING && !hashtable->parallel_state->phs_lasj_has_null /* CBDB_PARALLEL */) attached = BarrierArriveAndDetachExceptLast(&batch->batch_barrier); - if (attached && BarrierArriveAndDetach(&batch->batch_barrier)) + if (attached && !hashtable->parallel_state->phs_lasj_has_null /* CBDB_PARALLEL */ && + BarrierArriveAndDetach(&batch->batch_barrier)) { /* * We are not longer attached to the batch barrier, but we're the diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 7f58cafb75d..a28e6a14cdb 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -83,7 +83,7 @@ * PHJ_BATCH_ELECT -- initial state * PHJ_BATCH_ALLOCATE* -- one allocates buckets * PHJ_BATCH_LOAD -- all load the hash table from disk - * PHJ_BATCH_PROBE -- all probe + * PHJ_BATCH_PROBING -- all probe * PHJ_BATCH_SCAN* -- one does full/right unmatched scan * PHJ_BATCH_FREE* -- one frees memory * @@ -102,7 +102,7 @@ * to a barrier, unless the barrier has reached a phase that means that no * process will wait on it again. We emit tuples while attached to the build * barrier in phase PHJ_BUILD_RUN, and to a per-batch barrier in phase - * PHJ_BATCH_PROBE. These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_SCAN + * PHJ_BATCH_PROBING. These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_SCAN * respectively without waiting, using BarrierArriveAndDetach() and * BarrierArriveAndDetachExceptLast() respectively. The last to detach * receives a different return value so that it knows that it's safe to @@ -1549,7 +1549,7 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) * since that phase is already underway (the thing we * can't do under current deadlock-avoidance rules is wait * for others to arrive at PHJ_BATCH_SCAN, because - * PHJ_BATCH_PROBE emits tuples, but in this case we just + * PHJ_BATCH_PROBING emits tuples, but in this case we just * got here without waiting). That is not yet done. For * now, we just detach and go around again. We have to * use ExecHashTableDetachBatch() because there's a small diff --git a/src/include/cdb/cdbpathlocus.h b/src/include/cdb/cdbpathlocus.h index 0f71ba55dfb..9f5a8227e68 100644 --- a/src/include/cdb/cdbpathlocus.h +++ b/src/include/cdb/cdbpathlocus.h @@ -292,13 +292,13 @@ typedef struct CdbPathLocus _locus->parallel_workers = (parallel_workers_); \ Assert(cdbpathlocus_is_valid(*_locus)); \ } while (0) -#define CdbPathLocus_MakeHashedOJ(plocus, distkey_, numsegments_) \ +#define CdbPathLocus_MakeHashedOJ(plocus, distkey_, numsegments_, parallel_workers_) \ do { \ CdbPathLocus *_locus = (plocus); \ _locus->locustype = CdbLocusType_HashedOJ; \ _locus->numsegments = (numsegments_); \ _locus->distkey = (distkey_); \ - _locus->parallel_workers = 0; \ + _locus->parallel_workers = (parallel_workers_); \ Assert(cdbpathlocus_is_valid(*_locus)); \ } while (0) #define CdbPathLocus_MakeHashedWorkers(plocus, distkey_, numsegments_, parallel_workers_) \ --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
