This is an automated email from the ASF dual-hosted git repository.

avamingli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git

commit d971e822804ebb121ed164fc4c72a862ed33e8f7
Author: Zhang Mingli <[email protected]>
AuthorDate: Tue Mar 24 16:33:27 2026 +0800

    tests: add Parallel Hash Full/Right Join regression cases
    
    cbdb_parallel.sql: add a new test block covering:
      - Parallel Hash Full Join (HashedWorkers FULL JOIN HashedWorkers
        produces HashedOJ with parallel_workers=2)
      - Parallel Hash Right Join (pj_t1 is 3x larger than pj_t2, so the
        planner hashes the smaller pj_t2 and probes with pj_t1; result
        locus HashedWorkers)
      - Correctness checks: count(*) matches serial execution
      - Locus propagation: HashedOJ(parallel) followed by INNER JOIN
        produces HashedOJ; followed by FULL JOIN produces HashedOJ
    
    join_hash.sql/out: CBDB-specific adaptations for the upstream parallel
    full join test -- disable parallel mode for tests that require serial
    plans, fix SAVEPOINT inside a parallel worker context, and update
    expected output to match CBDB plan shapes.
---
 contrib/pax_storage/expected/cbdb_parallel.out    | 183 +++++++----
 src/test/regress/expected/cbdb_parallel.out       | 370 ++++++++++++++++------
 src/test/regress/expected/join_hash.out           |  83 +++--
 src/test/regress/expected/join_hash_optimizer.out | 204 +++++++++---
 src/test/regress/sql/cbdb_parallel.sql            |  50 +++
 src/test/regress/sql/join_hash.sql                |   6 +
 6 files changed, 668 insertions(+), 228 deletions(-)

diff --git a/contrib/pax_storage/expected/cbdb_parallel.out 
b/contrib/pax_storage/expected/cbdb_parallel.out
index db583090026..ec6ceba7e3c 100644
--- a/contrib/pax_storage/expected/cbdb_parallel.out
+++ b/contrib/pax_storage/expected/cbdb_parallel.out
@@ -41,13 +41,29 @@ set gp_appendonly_insert_files = 4;
 begin;
 set local enable_parallel = on;
 create table test_131_ao1(x int, y int) using ao_row with(parallel_workers=2);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 create table test_131_ao2(x int, y int) using ao_row with(parallel_workers=2);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 create table test_131_ao3(x int, y int) using ao_row with(parallel_workers=0);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 create table test_131_ao4(x int, y int) using ao_row with(parallel_workers=0);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 create table test_131_aoco1(x int, y int) using ao_column 
with(parallel_workers=2);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 create table test_131_aoco2(x int, y int) using ao_column 
with(parallel_workers=2);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 create table test_131_aoco3(x int, y int) using ao_column 
with(parallel_workers=0);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 create table test_131_aoco4(x int, y int) using ao_column 
with(parallel_workers=0);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 select relname, reloptions from pg_catalog.pg_class where relname like 
'test_131_ao%';
     relname     |      reloptions      
 ----------------+----------------------
@@ -155,8 +171,14 @@ explain(locus, costs off) select count(*) from 
test_131_aoco3, test_131_aoco4 wh
 
 abort;
 create table ao1(x int, y int);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 create table ao2(x int, y int);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 create table aocs1(x int, y int);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 begin;
 -- encourage use of parallel plans
 set local min_parallel_table_scan_size = 0;
@@ -367,6 +389,8 @@ abort;
 begin;
 set local max_parallel_workers_per_gather = 2;
 create table t1(a int, b int) with(parallel_workers=2);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 create table rt1(a int, b int) with(parallel_workers=2) distributed replicated;
 create table rt2(a int, b int) distributed replicated;
 create table rt3(a int, b int) distributed replicated;
@@ -599,6 +623,8 @@ select * from rt1 join t1 on rt1.a = t1.b join rt2 on rt2.a 
= t1.b;
   5 |  6 |  4 |  5 |  5 |  6
   8 |  9 |  7 |  8 |  8 |  9
   9 | 10 |  8 |  9 |  9 | 10
+  1 |  2 |  1 |  1 |  1 |  2
+  2 |  3 |  1 |  2 |  2 |  3
   5 |  6 |  5 |  5 |  5 |  6
   6 |  7 |  6 |  6 |  6 |  7
   9 | 10 |  9 |  9 |  9 | 10
@@ -606,8 +632,6 @@ select * from rt1 join t1 on rt1.a = t1.b join rt2 on rt2.a 
= t1.b;
   6 |  7 |  5 |  6 |  6 |  7
   7 |  8 |  6 |  7 |  7 |  8
  10 | 11 |  9 | 10 | 10 | 11
-  1 |  2 |  1 |  1 |  1 |  2
-  2 |  3 |  1 |  2 |  2 |  3
 (19 rows)
 
 -- parallel hash join
@@ -650,13 +674,6 @@ explain(locus, costs off) select * from rt1 join t1 on 
rt1.a = t1.b join rt2 on
 select * from rt1 join t1 on rt1.a = t1.b join rt2 on rt2.a = t1.b;
  a  | b  | a  | b  | a  | b  
 ----+----+----+----+----+----
-  5 |  6 |  5 |  5 |  5 |  6
-  6 |  7 |  5 |  6 |  6 |  7
-  6 |  7 |  6 |  6 |  6 |  7
-  7 |  8 |  6 |  7 |  7 |  8
-  9 | 10 |  9 |  9 |  9 | 10
- 10 | 11 |  9 | 10 | 10 | 11
- 10 | 11 | 10 | 10 | 10 | 11
   2 |  3 |  2 |  2 |  2 |  3
   3 |  4 |  2 |  3 |  3 |  4
   3 |  4 |  3 |  3 |  3 |  4
@@ -669,6 +686,13 @@ select * from rt1 join t1 on rt1.a = t1.b join rt2 on 
rt2.a = t1.b;
   9 | 10 |  8 |  9 |  9 | 10
   1 |  2 |  1 |  1 |  1 |  2
   2 |  3 |  1 |  2 |  2 |  3
+  5 |  6 |  5 |  5 |  5 |  6
+  6 |  7 |  5 |  6 |  6 |  7
+  6 |  7 |  6 |  6 |  6 |  7
+  7 |  8 |  6 |  7 |  7 |  8
+  9 | 10 |  9 |  9 |  9 | 10
+ 10 | 11 |  9 | 10 | 10 | 11
+ 10 | 11 | 10 | 10 | 10 | 11
 (19 rows)
 
 --
@@ -702,6 +726,8 @@ explain(locus, costs off) select * from rt1 join t1 on 
rt1.a = t1.b join rt3 on
 select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a = t1.b;
  a  | b  | a  | b  | a  | b  
 ----+----+----+----+----+----
+  1 |  2 |  1 |  1 |  1 |  2
+  2 |  3 |  1 |  2 |  2 |  3
   2 |  3 |  2 |  2 |  2 |  3
   3 |  4 |  3 |  3 |  3 |  4
   4 |  5 |  4 |  4 |  4 |  5
@@ -712,8 +738,6 @@ select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a 
= t1.b;
   5 |  6 |  4 |  5 |  5 |  6
   8 |  9 |  7 |  8 |  8 |  9
   9 | 10 |  8 |  9 |  9 | 10
-  1 |  2 |  1 |  1 |  1 |  2
-  2 |  3 |  1 |  2 |  2 |  3
   5 |  6 |  5 |  5 |  5 |  6
   6 |  7 |  6 |  6 |  6 |  7
   9 | 10 |  9 |  9 |  9 | 10
@@ -779,6 +803,8 @@ select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a 
= t1.b;
 (19 rows)
 
 create table t2(a int, b int) with(parallel_workers=0);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 create table rt4(a int, b int) with(parallel_workers=2) distributed replicated;
 insert into t2 select i, i+1 from generate_series(1, 10) i;
 insert into rt4 select i, i+1 from generate_series(1, 10000) i;
@@ -788,16 +814,16 @@ set local enable_parallel = off;
 select * from rt4 join t2 using(b);
  b  | a  | a  
 ----+----+----
-  2 |  1 |  1
-  6 |  5 |  5
-  7 |  6 |  6
- 10 |  9 |  9
- 11 | 10 | 10
   3 |  2 |  2
   4 |  3 |  3
   5 |  4 |  4
   8 |  7 |  7
   9 |  8 |  8
+  2 |  1 |  1
+  6 |  5 |  5
+  7 |  6 |  6
+ 10 |  9 |  9
+ 11 | 10 | 10
 (10 rows)
 
 set local enable_parallel = on;
@@ -828,19 +854,21 @@ explain(locus, costs off) select * from rt4 join t2 
using(b);
 select * from rt4 join t2 using(b);
  b  | a  | a  
 ----+----+----
-  2 |  1 |  1
+  6 |  5 |  5
+  7 |  6 |  6
+ 10 |  9 |  9
+ 11 | 10 | 10
   3 |  2 |  2
   4 |  3 |  3
   5 |  4 |  4
   8 |  7 |  7
   9 |  8 |  8
-  6 |  5 |  5
-  7 |  6 |  6
- 10 |  9 |  9
- 11 | 10 | 10
+  2 |  1 |  1
 (10 rows)
 
 create table t3(a int, b int) with(parallel_workers=2);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 insert into t3 select i, i+1 from generate_series(1, 9000) i;
 analyze t3;
 set local enable_parallel = off;
@@ -919,10 +947,10 @@ explain(locus, costs off) select * from 
t_replica_workers_2 join t_random_worker
 select * from t_replica_workers_2 join t_random_workers_0 using(a);
  a | b | b 
 ---+---+---
- 2 | 3 | 3
- 3 | 4 | 4
  1 | 2 | 2
+ 2 | 3 | 3
  4 | 5 | 5
+ 3 | 4 | 4
  5 | 6 | 6
 (5 rows)
 
@@ -931,11 +959,11 @@ set local enable_parallel=false;
 select * from t_replica_workers_2 join t_random_workers_0 using(a);
  a | b | b 
 ---+---+---
- 2 | 3 | 3
  3 | 4 | 4
- 1 | 2 | 2
- 4 | 5 | 5
  5 | 6 | 6
+ 4 | 5 | 5
+ 1 | 2 | 2
+ 2 | 3 | 3
 (5 rows)
 
 abort;
@@ -976,11 +1004,11 @@ explain(locus, costs off) select * from 
t_replica_workers_2 right join t_random_
 select * from t_replica_workers_2 right join t_random_workers_2 using(a);
  a | b | b 
 ---+---+---
- 5 | 6 | 6
  1 | 2 | 2
  2 | 3 | 3
  3 | 4 | 4
  4 | 5 | 5
+ 5 | 6 | 6
 (5 rows)
 
 -- non parallel results
@@ -1028,14 +1056,14 @@ explain(locus, costs off) select * from 
t_replica_workers_2 join t_random_worker
                      Locus: Strewn
                      Parallel Workers: 2
  Optimizer: Postgres query optimizer
-(16 rows)
+(15 rows)
 
 select * from t_replica_workers_2 join t_random_workers_2 using(a);
  a | b | b 
 ---+---+---
- 2 | 3 | 3
  1 | 2 | 2
  3 | 4 | 4
+ 2 | 3 | 3
  4 | 5 | 5
  5 | 6 | 6
 (5 rows)
@@ -1045,9 +1073,9 @@ set local enable_parallel=false;
 select * from t_replica_workers_2 join t_random_workers_2 using(a);
  a | b | b 
 ---+---+---
- 2 | 3 | 3
  1 | 2 | 2
  3 | 4 | 4
+ 2 | 3 | 3
  4 | 5 | 5
  5 | 6 | 6
 (5 rows)
@@ -1059,7 +1087,11 @@ abort;
 --
 begin;
 create table t1(a int, b int) with(parallel_workers=3);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 create table t2(b int, a int) with(parallel_workers=2);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'b' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 insert into t1 select i, i+1 from generate_series(1, 10) i;
 insert into t2 select i, i+1 from generate_series(1, 5) i;
 analyze t1;
@@ -1071,17 +1103,17 @@ explain(costs off) select * from t1 right join t2 on 
t1.b = t2.a;
                             QUERY PLAN                            
 ------------------------------------------------------------------
  Gather Motion 9:1  (slice1; segments: 9)
-   ->  Parallel Hash Left Join
-         Hash Cond: (t2.a = t1.b)
-         ->  Redistribute Motion 6:9  (slice2; segments: 6)
-               Hash Key: t2.a
+   ->  Parallel Hash Right Join
+         Hash Cond: (t1.b = t2.a)
+         ->  Redistribute Motion 9:9  (slice2; segments: 9)
+               Hash Key: t1.b
                Hash Module: 3
-               ->  Parallel Seq Scan on t2
+               ->  Parallel Seq Scan on t1
          ->  Parallel Hash
-               ->  Redistribute Motion 9:9  (slice3; segments: 9)
-                     Hash Key: t1.b
+               ->  Redistribute Motion 6:9  (slice3; segments: 6)
+                     Hash Key: t2.a
                      Hash Module: 3
-                     ->  Parallel Seq Scan on t1
+                     ->  Parallel Seq Scan on t2
  Optimizer: Postgres query optimizer
 (13 rows)
 
@@ -1091,7 +1123,11 @@ abort;
 --
 begin;
 create table t1(a int, b int) with(parallel_workers=2);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 create table t2(a int, b int) with(parallel_workers=2);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 insert into t1 select i%10, i from generate_series(1, 5) i;
 insert into t1 values (100000);
 insert into t2 select i%10, i from generate_series(1, 100000) i;
@@ -1100,34 +1136,34 @@ analyze t2;
 set local enable_parallel = on;
 -- parallel hash join with shared table, SinglQE as outer partial path.
 explain(locus, costs off) select * from (select count(*) as a from t2) t2 left 
join t1 on t1.a = t2.a;
-                            QUERY PLAN                            
-------------------------------------------------------------------
+                               QUERY PLAN                               
+------------------------------------------------------------------------
  Gather Motion 6:1  (slice1; segments: 6)
    Locus: Entry
-   ->  Parallel Hash Left Join
-         Locus: Hashed
+   ->  Parallel Hash Right Join
+         Locus: HashedWorkers
          Parallel Workers: 2
-         Hash Cond: ((count(*)) = t1.a)
-         ->  Redistribute Motion 1:6  (slice2; segments: 1)
-               Locus: Hashed
+         Hash Cond: (t1.a = (count(*)))
+         ->  Parallel Seq Scan on t1
+               Locus: HashedWorkers
                Parallel Workers: 2
-               Hash Key: (count(*))
-               Hash Module: 3
-               ->  Finalize Aggregate
-                     Locus: SingleQE
-                     ->  Gather Motion 6:1  (slice3; segments: 6)
-                           Locus: SingleQE
-                           ->  Partial Aggregate
-                                 Locus: HashedWorkers
-                                 Parallel Workers: 2
-                                 ->  Parallel Seq Scan on t2
-                                       Locus: HashedWorkers
-                                       Parallel Workers: 2
          ->  Parallel Hash
                Locus: Hashed
-               ->  Parallel Seq Scan on t1
-                     Locus: HashedWorkers
+               ->  Redistribute Motion 1:6  (slice2; segments: 1)
+                     Locus: Hashed
                      Parallel Workers: 2
+                     Hash Key: (count(*))
+                     Hash Module: 3
+                     ->  Finalize Aggregate
+                           Locus: SingleQE
+                           ->  Gather Motion 6:1  (slice3; segments: 6)
+                                 Locus: SingleQE
+                                 ->  Partial Aggregate
+                                       Locus: HashedWorkers
+                                       Parallel Workers: 2
+                                       ->  Parallel Seq Scan on t2
+                                             Locus: HashedWorkers
+                                             Parallel Workers: 2
  Optimizer: Postgres query optimizer
 (27 rows)
 
@@ -1323,12 +1359,18 @@ begin;
 create table rt1(a int, b int) distributed replicated;
 create table rt2(a int, b int) with (parallel_workers = 0) distributed 
replicated;
 create table t1(a int, b int);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 create table t2(a int, b int) with (parallel_workers = 0);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 insert into t1 select i, i+1 from generate_series(1, 10000) i;
 insert into t2 select i, i+1 from generate_series(1, 10000) i;
 insert into rt1 select i, i+1 from generate_series(1, 10000) i;
 insert into rt2 select i, i+1 from generate_series(1, 10000) i;
 CREATE TABLE sq1 AS SELECT a, b FROM t1 WHERE gp_segment_id = 0;
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named 
'a' as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 set local optimizer=off;
 set local enable_parallel=on;
 set local min_parallel_table_scan_size to 0;
@@ -1385,7 +1427,7 @@ explain (locus, costs off) select * from rt1 union all 
select * from t1;
          ->  Result
                Locus: Strewn
                Parallel Workers: 2
-               One-Time Filter: (gp_execution_segment() = 1)
+               One-Time Filter: (gp_execution_segment() = 0)
                ->  Parallel Seq Scan on rt1
                      Locus: SegmentGeneralWorkers
                      Parallel Workers: 2
@@ -1409,7 +1451,7 @@ explain (locus, costs off) select * from rt1 union all 
select * from t2;
          ->  Result
                Locus: Strewn
                Parallel Workers: 2
-               One-Time Filter: (gp_execution_segment() = 1)
+               One-Time Filter: (gp_execution_segment() = 0)
                ->  Parallel Seq Scan on rt1
                      Locus: SegmentGeneralWorkers
                      Parallel Workers: 2
@@ -1482,6 +1524,8 @@ abort;
 --
 begin;
 create table t1(c1 int, c2 int) with(parallel_workers=2);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'c1' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 insert into t1 select i, i+1 from generate_series(1, 100000) i;
 analyze t1;
 set local optimizer = off;
@@ -1549,6 +1593,8 @@ abort;
 --
 begin;
 create table t1(c1 int, c2 int) with(parallel_workers=2);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'c1' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 insert into t1 select i, i+1 from generate_series(1, 100000) i;
 analyze t1;
 set local optimizer = off;
@@ -1768,6 +1814,8 @@ set local optimizer = off;
 set local enable_parallel = on;
 -- ao table
 create table ao (a INT, b INT) using ao_row;
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 insert into ao select i as a, i as b from generate_series(1, 100) AS i;
 alter table ao set (parallel_workers = 2);
 explain(costs off) select count(*) from ao;
@@ -1789,6 +1837,8 @@ select count(*) from ao;
 alter table ao reset (parallel_workers);
 -- aocs table
 create table aocs (a INT, b INT) using ao_column;
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 insert into aocs select i as a, i as b from generate_series(1, 100) AS i;
 alter table aocs set (parallel_workers = 2);
 explain(costs off) select count(*) from aocs;
@@ -1862,9 +1912,14 @@ select * from
 abort;
 begin;
 create table pagg_tab (a int, b int, c text, d int) partition by list(c);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 create table pagg_tab_p1 partition of pagg_tab for values in ('0000', '0001', 
'0002', '0003', '0004');
+NOTICE:  table has parent, setting distribution columns to match parent table
 create table pagg_tab_p2 partition of pagg_tab for values in ('0005', '0006', 
'0007', '0008');
+NOTICE:  table has parent, setting distribution columns to match parent table
 create table pagg_tab_p3 partition of pagg_tab for values in ('0009', '0010', 
'0011');
+NOTICE:  table has parent, setting distribution columns to match parent table
 insert into pagg_tab select i % 20, i % 30, to_char(i % 12, 'FM0000'), i % 30 
from generate_series(0, 2999) i;
 analyze pagg_tab;
 set local enable_parallel to off;
@@ -1939,7 +1994,11 @@ abort;
 --
 begin;
 create table t1(a int, b int) with(parallel_workers=3);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 create table t2(b int, a int) with(parallel_workers=2);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'b' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 insert into t1 select i, i+1 from generate_series(1, 10) i;
 insert into t2 select i, i+1 from generate_series(1, 5) i;
 analyze t1;
@@ -2329,6 +2388,8 @@ abort;
 -- prepare, execute locus is null
 begin;
 create table t1(c1 int, c2 int);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'c1' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 analyze t1;
 prepare t1_count(integer) as select count(*) from t1;
 explain(locus, costs off) execute t1_count(1);
diff --git a/src/test/regress/expected/cbdb_parallel.out 
b/src/test/regress/expected/cbdb_parallel.out
index 35e90eebfa1..af975de50f4 100644
--- a/src/test/regress/expected/cbdb_parallel.out
+++ b/src/test/regress/expected/cbdb_parallel.out
@@ -112,8 +112,8 @@ set local enable_parallel_dedup_semi_reverse_join = on;
 set local enable_parallel_dedup_semi_join = on;
 explain (costs off)
 select sum(foo.a) from foo where exists (select 1 from bar where foo.a = 
bar.b);
-                                           QUERY PLAN                          
                 
-------------------------------------------------------------------------------------------------
+                                          QUERY PLAN                           
                
+-----------------------------------------------------------------------------------------------
  Finalize Aggregate
    ->  Gather Motion 6:1  (slice1; segments: 6)
          ->  Partial Aggregate
@@ -1032,6 +1032,15 @@ explain(locus, costs off) select * from rt1 join t1 on 
rt1.a = t1.b join rt2 on
 select * from rt1 join t1 on rt1.a = t1.b join rt2 on rt2.a = t1.b;
  a  | b  | a  | b  | a  | b  
 ----+----+----+----+----+----
+  1 |  2 |  1 |  1 |  1 |  2
+  2 |  3 |  1 |  2 |  2 |  3
+  5 |  6 |  5 |  5 |  5 |  6
+  6 |  7 |  6 |  6 |  6 |  7
+  9 | 10 |  9 |  9 |  9 | 10
+ 10 | 11 | 10 | 10 | 10 | 11
+  6 |  7 |  5 |  6 |  6 |  7
+  7 |  8 |  6 |  7 |  7 |  8
+ 10 | 11 |  9 | 10 | 10 | 11
   2 |  3 |  2 |  2 |  2 |  3
   3 |  4 |  3 |  3 |  3 |  4
   4 |  5 |  4 |  4 |  4 |  5
@@ -1042,15 +1051,6 @@ select * from rt1 join t1 on rt1.a = t1.b join rt2 on 
rt2.a = t1.b;
   5 |  6 |  4 |  5 |  5 |  6
   8 |  9 |  7 |  8 |  8 |  9
   9 | 10 |  8 |  9 |  9 | 10
-  5 |  6 |  5 |  5 |  5 |  6
-  6 |  7 |  6 |  6 |  6 |  7
-  9 | 10 |  9 |  9 |  9 | 10
- 10 | 11 | 10 | 10 | 10 | 11
-  6 |  7 |  5 |  6 |  6 |  7
-  7 |  8 |  6 |  7 |  7 |  8
- 10 | 11 |  9 | 10 | 10 | 11
-  1 |  2 |  1 |  1 |  1 |  2
-  2 |  3 |  1 |  2 |  2 |  3
 (19 rows)
 
 -- parallel hash join
@@ -1093,13 +1093,8 @@ explain(locus, costs off) select * from rt1 join t1 on 
rt1.a = t1.b join rt2 on
 select * from rt1 join t1 on rt1.a = t1.b join rt2 on rt2.a = t1.b;
  a  | b  | a  | b  | a  | b  
 ----+----+----+----+----+----
-  5 |  6 |  5 |  5 |  5 |  6
-  6 |  7 |  5 |  6 |  6 |  7
-  6 |  7 |  6 |  6 |  6 |  7
-  7 |  8 |  6 |  7 |  7 |  8
-  9 | 10 |  9 |  9 |  9 | 10
- 10 | 11 |  9 | 10 | 10 | 11
- 10 | 11 | 10 | 10 | 10 | 11
+  1 |  2 |  1 |  1 |  1 |  2
+  2 |  3 |  1 |  2 |  2 |  3
   2 |  3 |  2 |  2 |  2 |  3
   3 |  4 |  2 |  3 |  3 |  4
   3 |  4 |  3 |  3 |  3 |  4
@@ -1110,8 +1105,13 @@ select * from rt1 join t1 on rt1.a = t1.b join rt2 on 
rt2.a = t1.b;
   8 |  9 |  7 |  8 |  8 |  9
   8 |  9 |  8 |  8 |  8 |  9
   9 | 10 |  8 |  9 |  9 | 10
-  1 |  2 |  1 |  1 |  1 |  2
-  2 |  3 |  1 |  2 |  2 |  3
+  5 |  6 |  5 |  5 |  5 |  6
+  6 |  7 |  5 |  6 |  6 |  7
+  6 |  7 |  6 |  6 |  6 |  7
+  7 |  8 |  6 |  7 |  7 |  8
+  9 | 10 |  9 |  9 |  9 | 10
+ 10 | 11 |  9 | 10 | 10 | 11
+ 10 | 11 | 10 | 10 | 10 | 11
 (19 rows)
 
 --
@@ -1145,6 +1145,8 @@ explain(locus, costs off) select * from rt1 join t1 on 
rt1.a = t1.b join rt3 on
 select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a = t1.b;
  a  | b  | a  | b  | a  | b  
 ----+----+----+----+----+----
+  1 |  2 |  1 |  1 |  1 |  2
+  2 |  3 |  1 |  2 |  2 |  3
   2 |  3 |  2 |  2 |  2 |  3
   3 |  4 |  3 |  3 |  3 |  4
   4 |  5 |  4 |  4 |  4 |  5
@@ -1155,8 +1157,6 @@ select * from rt1 join t1 on rt1.a = t1.b join rt3 on 
rt3.a = t1.b;
   5 |  6 |  4 |  5 |  5 |  6
   8 |  9 |  7 |  8 |  8 |  9
   9 | 10 |  8 |  9 |  9 | 10
-  1 |  2 |  1 |  1 |  1 |  2
-  2 |  3 |  1 |  2 |  2 |  3
   5 |  6 |  5 |  5 |  5 |  6
   6 |  7 |  6 |  6 |  6 |  7
   9 | 10 |  9 |  9 |  9 | 10
@@ -1201,14 +1201,11 @@ select * from rt1 join t1 on rt1.a = t1.b join rt3 on 
rt3.a = t1.b;
  a  | b  | a  | b  | a  | b  
 ----+----+----+----+----+----
   1 |  2 |  1 |  1 |  1 |  2
-  2 |  3 |  1 |  2 |  2 |  3
   5 |  6 |  5 |  5 |  5 |  6
   6 |  7 |  6 |  6 |  6 |  7
   9 | 10 |  9 |  9 |  9 | 10
  10 | 11 | 10 | 10 | 10 | 11
-  6 |  7 |  5 |  6 |  6 |  7
-  7 |  8 |  6 |  7 |  7 |  8
- 10 | 11 |  9 | 10 | 10 | 11
+  2 |  3 |  1 |  2 |  2 |  3
   2 |  3 |  2 |  2 |  2 |  3
   3 |  4 |  3 |  3 |  3 |  4
   4 |  5 |  4 |  4 |  4 |  5
@@ -1219,6 +1216,9 @@ select * from rt1 join t1 on rt1.a = t1.b join rt3 on 
rt3.a = t1.b;
   5 |  6 |  4 |  5 |  5 |  6
   8 |  9 |  7 |  8 |  8 |  9
   9 | 10 |  8 |  9 |  9 | 10
+  6 |  7 |  5 |  6 |  6 |  7
+  7 |  8 |  6 |  7 |  7 |  8
+ 10 | 11 |  9 | 10 | 10 | 11
 (19 rows)
 
 create table t2(a int, b int) with(parallel_workers=0);
@@ -1271,12 +1271,12 @@ explain(locus, costs off) select * from rt4 join t2 
using(b);
 select * from rt4 join t2 using(b);
  b  | a  | a  
 ----+----+----
-  2 |  1 |  1
   3 |  2 |  2
   4 |  3 |  3
   5 |  4 |  4
   8 |  7 |  7
   9 |  8 |  8
+  2 |  1 |  1
   6 |  5 |  5
   7 |  6 |  6
  10 |  9 |  9
@@ -1362,9 +1362,9 @@ explain(locus, costs off) select * from 
t_replica_workers_2 join t_random_worker
 select * from t_replica_workers_2 join t_random_workers_0 using(a);
  a | b | b 
 ---+---+---
- 2 | 3 | 3
- 3 | 4 | 4
  1 | 2 | 2
+ 3 | 4 | 4
+ 2 | 3 | 3
  4 | 5 | 5
  5 | 6 | 6
 (5 rows)
@@ -1374,9 +1374,9 @@ set local enable_parallel=false;
 select * from t_replica_workers_2 join t_random_workers_0 using(a);
  a | b | b 
 ---+---+---
- 2 | 3 | 3
- 3 | 4 | 4
  1 | 2 | 2
+ 3 | 4 | 4
+ 2 | 3 | 3
  4 | 5 | 5
  5 | 6 | 6
 (5 rows)
@@ -1419,9 +1419,9 @@ explain(locus, costs off) select * from 
t_replica_workers_2 right join t_random_
 select * from t_replica_workers_2 right join t_random_workers_2 using(a);
  a | b | b 
 ---+---+---
+ 2 | 3 | 3
  5 | 6 | 6
  1 | 2 | 2
- 2 | 3 | 3
  3 | 4 | 4
  4 | 5 | 5
 (5 rows)
@@ -1431,11 +1431,11 @@ set local enable_parallel=false;
 select * from t_replica_workers_2 right join t_random_workers_2 using(a);
  a | b | b 
 ---+---+---
+ 5 | 6 | 6
  1 | 2 | 2
- 2 | 3 | 3
  3 | 4 | 4
  4 | 5 | 5
- 5 | 6 | 6
+ 2 | 3 | 3
 (5 rows)
 
 abort;
@@ -1471,13 +1471,13 @@ explain(locus, costs off) select * from 
t_replica_workers_2 join t_random_worker
                      Locus: Strewn
                      Parallel Workers: 2
  Optimizer: Postgres query optimizer
-(16 rows)
+(15 rows)
 
 select * from t_replica_workers_2 join t_random_workers_2 using(a);
  a | b | b 
 ---+---+---
- 2 | 3 | 3
  1 | 2 | 2
+ 2 | 3 | 3
  3 | 4 | 4
  4 | 5 | 5
  5 | 6 | 6
@@ -1488,11 +1488,11 @@ set local enable_parallel=false;
 select * from t_replica_workers_2 join t_random_workers_2 using(a);
  a | b | b 
 ---+---+---
- 2 | 3 | 3
- 1 | 2 | 2
  3 | 4 | 4
  4 | 5 | 5
  5 | 6 | 6
+ 1 | 2 | 2
+ 2 | 3 | 3
 (5 rows)
 
 abort;
@@ -1510,28 +1510,28 @@ analyze t1;
 analyze rt1;
 set local enable_parallel = on;
 explain(locus, costs off) select * from (select count(*) as a from t1) t1 left 
join rt1  on rt1.a = t1.a;
-                      QUERY PLAN                      
-------------------------------------------------------
- Parallel Hash Left Join
+                         QUERY PLAN                         
+------------------------------------------------------------
+ Parallel Hash Right Join
    Locus: Entry
-   Hash Cond: ((count(*)) = rt1.a)
-   ->  Finalize Aggregate
+   Hash Cond: (rt1.a = (count(*)))
+   ->  Gather Motion 2:1  (slice1; segments: 2)
          Locus: Entry
-         ->  Gather Motion 6:1  (slice1; segments: 6)
-               Locus: Entry
-               ->  Partial Aggregate
-                     Locus: HashedWorkers
-                     Parallel Workers: 2
-                     ->  Parallel Seq Scan on t1
-                           Locus: HashedWorkers
-                           Parallel Workers: 2
+         ->  Parallel Seq Scan on rt1
+               Locus: SegmentGeneralWorkers
+               Parallel Workers: 2
    ->  Parallel Hash
          Locus: Entry
-         ->  Gather Motion 2:1  (slice2; segments: 2)
+         ->  Finalize Aggregate
                Locus: Entry
-               ->  Parallel Seq Scan on rt1
-                     Locus: SegmentGeneralWorkers
-                     Parallel Workers: 2
+               ->  Gather Motion 6:1  (slice2; segments: 6)
+                     Locus: Entry
+                     ->  Partial Aggregate
+                           Locus: HashedWorkers
+                           Parallel Workers: 2
+                           ->  Parallel Seq Scan on t1
+                                 Locus: HashedWorkers
+                                 Parallel Workers: 2
  Optimizer: Postgres query optimizer
 (21 rows)
 
@@ -1661,17 +1661,17 @@ explain(costs off) select * from t1 right join t2 on 
t1.b = t2.a;
                             QUERY PLAN                            
 ------------------------------------------------------------------
  Gather Motion 9:1  (slice1; segments: 9)
-   ->  Parallel Hash Left Join
-         Hash Cond: (t2.a = t1.b)
-         ->  Redistribute Motion 6:9  (slice2; segments: 6)
-               Hash Key: t2.a
+   ->  Parallel Hash Right Join
+         Hash Cond: (t1.b = t2.a)
+         ->  Redistribute Motion 9:9  (slice2; segments: 9)
+               Hash Key: t1.b
                Hash Module: 3
-               ->  Parallel Seq Scan on t2
+               ->  Parallel Seq Scan on t1
          ->  Parallel Hash
-               ->  Redistribute Motion 9:9  (slice3; segments: 9)
-                     Hash Key: t1.b
+               ->  Redistribute Motion 6:9  (slice3; segments: 6)
+                     Hash Key: t2.a
                      Hash Module: 3
-                     ->  Parallel Seq Scan on t1
+                     ->  Parallel Seq Scan on t2
  Optimizer: Postgres query optimizer
 (13 rows)
 
@@ -1690,34 +1690,34 @@ analyze t2;
 set local enable_parallel = on;
 -- parallel hash join with shared table, SinglQE as outer partial path.
 explain(locus, costs off) select * from (select count(*) as a from t2) t2 left 
join t1 on t1.a = t2.a;
-                            QUERY PLAN                            
-------------------------------------------------------------------
+                               QUERY PLAN                               
+------------------------------------------------------------------------
  Gather Motion 6:1  (slice1; segments: 6)
    Locus: Entry
-   ->  Parallel Hash Left Join
-         Locus: Hashed
+   ->  Parallel Hash Right Join
+         Locus: HashedWorkers
          Parallel Workers: 2
-         Hash Cond: ((count(*)) = t1.a)
-         ->  Redistribute Motion 1:6  (slice2; segments: 1)
-               Locus: Hashed
+         Hash Cond: (t1.a = (count(*)))
+         ->  Parallel Seq Scan on t1
+               Locus: HashedWorkers
                Parallel Workers: 2
-               Hash Key: (count(*))
-               Hash Module: 3
-               ->  Finalize Aggregate
-                     Locus: SingleQE
-                     ->  Gather Motion 6:1  (slice3; segments: 6)
-                           Locus: SingleQE
-                           ->  Partial Aggregate
-                                 Locus: HashedWorkers
-                                 Parallel Workers: 2
-                                 ->  Parallel Seq Scan on t2
-                                       Locus: HashedWorkers
-                                       Parallel Workers: 2
          ->  Parallel Hash
                Locus: Hashed
-               ->  Parallel Seq Scan on t1
-                     Locus: HashedWorkers
+               ->  Redistribute Motion 1:6  (slice2; segments: 1)
+                     Locus: Hashed
                      Parallel Workers: 2
+                     Hash Key: (count(*))
+                     Hash Module: 3
+                     ->  Finalize Aggregate
+                           Locus: SingleQE
+                           ->  Gather Motion 6:1  (slice3; segments: 6)
+                                 Locus: SingleQE
+                                 ->  Partial Aggregate
+                                       Locus: HashedWorkers
+                                       Parallel Workers: 2
+                                       ->  Parallel Seq Scan on t2
+                                             Locus: HashedWorkers
+                                             Parallel Workers: 2
  Optimizer: Postgres query optimizer
 (27 rows)
 
@@ -1975,7 +1975,7 @@ explain (locus, costs off) select * from rt1 union all 
select * from t1;
          ->  Result
                Locus: Strewn
                Parallel Workers: 3
-               One-Time Filter: (gp_execution_segment() = 0)
+               One-Time Filter: (gp_execution_segment() = 1)
                ->  Parallel Seq Scan on rt1
                      Locus: SegmentGeneralWorkers
                      Parallel Workers: 3
@@ -1999,7 +1999,7 @@ explain (locus, costs off) select * from rt1 union all 
select * from t2;
          ->  Result
                Locus: Strewn
                Parallel Workers: 3
-               One-Time Filter: (gp_execution_segment() = 0)
+               One-Time Filter: (gp_execution_segment() = 1)
                ->  Parallel Seq Scan on rt1
                      Locus: SegmentGeneralWorkers
                      Parallel Workers: 3
@@ -2296,8 +2296,8 @@ analyze t1;
 analyze t2;
 analyze t3_null;
 explain(costs off) select sum(t1.c1) from t1 where c1 not in (select c2 from 
t2);
-                                     QUERY PLAN                                
     
-------------------------------------------------------------------------------------
+                                    QUERY PLAN                                 
    
+-----------------------------------------------------------------------------------
  Finalize Aggregate
    ->  Gather Motion 6:1  (slice1; segments: 6)
          ->  Partial Aggregate
@@ -2317,8 +2317,8 @@ select sum(t1.c1) from t1 where c1 not in (select c2 from 
t2);
 (1 row)
 
 explain(costs off) select * from t1 where c1 not in (select c2 from t3_null);
-                               QUERY PLAN                               
-------------------------------------------------------------------------
+                              QUERY PLAN                               
+-----------------------------------------------------------------------
  Gather Motion 6:1  (slice1; segments: 6)
    ->  Parallel Hash Left Anti Semi (Not-In) Join
          Hash Cond: (t1.c1 = t3_null.c2)
@@ -2457,8 +2457,11 @@ abort;
 begin;
 create table pagg_tab (a int, b int, c text, d int) partition by list(c);
 create table pagg_tab_p1 partition of pagg_tab for values in ('0000', '0001', 
'0002', '0003', '0004');
+NOTICE:  table has parent, setting distribution columns to match parent table
 create table pagg_tab_p2 partition of pagg_tab for values in ('0005', '0006', 
'0007', '0008');
+NOTICE:  table has parent, setting distribution columns to match parent table
 create table pagg_tab_p3 partition of pagg_tab for values in ('0009', '0010', 
'0011');
+NOTICE:  table has parent, setting distribution columns to match parent table
 insert into pagg_tab select i % 20, i % 30, to_char(i % 12, 'FM0000'), i % 30 
from generate_series(0, 2999) i;
 analyze pagg_tab;
 set local enable_parallel to off;
@@ -2972,7 +2975,7 @@ create table t2_anti(a int, b int) 
with(parallel_workers=2) distributed by (b);
 insert into t2_anti values(generate_series(5, 10));
 explain(costs off, verbose)
 select t1_anti.a, t1_anti.b from t1_anti left join t2_anti on t1_anti.a = 
t2_anti.a where t2_anti.a is null;
-                            QUERY PLAN                            
+                            QUERY PLAN
 ------------------------------------------------------------------
  Gather Motion 3:1  (slice1; segments: 3)
    Output: t1_anti.a, t1_anti.b
@@ -3068,8 +3071,8 @@ select t1_anti.a, t1_anti.b from t1_anti left join 
t2_anti on t1_anti.a = t2_ant
 ---+---
  3 |  
  4 |  
- 1 |  
  2 |  
+ 1 |  
 (4 rows)
 
 abort;
@@ -3098,7 +3101,7 @@ insert into t_distinct_0 select * from t_distinct_0;
 analyze t_distinct_0;
 explain(costs off)
 select distinct a from t_distinct_0;
-                         QUERY PLAN                         
+                         QUERY PLAN
 ------------------------------------------------------------
  Gather Motion 3:1  (slice1; segments: 3)
    ->  HashAggregate
@@ -3232,8 +3235,6 @@ select distinct a, b from t_distinct_0;
 drop table if exists t_distinct_1;
 NOTICE:  table "t_distinct_1" does not exist, skipping
 create table t_distinct_1(a int, b int) using ao_column;
-NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' 
as the Apache Cloudberry data distribution key for this table.
-HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 insert into t_distinct_1 select * from t_distinct_0;
 analyze t_distinct_1;
 set enable_parallel = off;
@@ -3520,10 +3521,7 @@ WHERE e.salary > (
 -- Test https://github.com/apache/cloudberry/issues/1376
 --
 create table t1(a int, b int);
-NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' 
as the Apache Cloudberry data distribution key for this table.
-HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 create table t2 (like t1);
-NOTICE:  table doesn't have 'DISTRIBUTED BY' clause, defaulting to 
distribution columns from LIKE table
 set gp_cte_sharing = on;
 explain(locus, costs off) with x as
   (select a, count(*) as b from t1 group by a union all
@@ -3571,8 +3569,184 @@ explain(locus, costs off) with x as
 reset gp_cte_sharing;
 reset enable_parallel;
 reset min_parallel_table_scan_size;
+--
+-- Parallel Hash Full/Right Join
+--
+begin;
+create table pj_t1(id int, v int) with(parallel_workers=2) distributed by (id);
+create table pj_t2(id int, v int) with(parallel_workers=2) distributed by (id);
+create table pj_t3(id int, v int) with(parallel_workers=0) distributed by (id);
+-- pj_t1 is 3x larger than pj_t2 so the planner hashes the smaller pj_t2
+-- and probes with pj_t1, producing a genuine Parallel Hash Right Join plan.
+insert into pj_t1 select i, i from generate_series(1,30000)i;
+insert into pj_t2 select i, i from generate_series(25001,35000)i;
+insert into pj_t3 select i, i from generate_series(1,10000)i;
+analyze pj_t1;
+analyze pj_t2;
+analyze pj_t3;
+set local enable_parallel = on;
+set local min_parallel_table_scan_size = 0;
+-- 12_P_12_10: Parallel Hash Full Join: HashedWorkers FULL JOIN HashedWorkers 
-> HashedOJ(parallel)
+explain(costs off, locus)
+select count(*) from pj_t1 full join pj_t2 using (id);
+                        QUERY PLAN                        
+----------------------------------------------------------
+ Finalize Aggregate
+   Locus: Entry
+   ->  Gather Motion 6:1  (slice1; segments: 6)
+         Locus: Entry
+         ->  Partial Aggregate
+               Locus: HashedOJ
+               Parallel Workers: 2
+               ->  Parallel Hash Full Join
+                     Locus: HashedOJ
+                     Parallel Workers: 2
+                     Hash Cond: (pj_t1.id = pj_t2.id)
+                     ->  Parallel Seq Scan on pj_t1
+                           Locus: HashedWorkers
+                           Parallel Workers: 2
+                     ->  Parallel Hash
+                           Locus: Hashed
+                           ->  Parallel Seq Scan on pj_t2
+                                 Locus: HashedWorkers
+                                 Parallel Workers: 2
+ Optimizer: Postgres query optimizer
+(20 rows)
+
+-- correctness: parallel result matches non-parallel
+set local enable_parallel = off;
+select count(*) from pj_t1 full join pj_t2 using (id);
+ count 
+-------
+ 35000
+(1 row)
+
+set local enable_parallel = on;
+select count(*) from pj_t1 full join pj_t2 using (id);
+ count 
+-------
+ 35000
+(1 row)
+
+-- Parallel Hash Right Join: pj_t1 (30K) is larger, so the planner hashes the 
smaller pj_t2
+-- (10K) as the build side and probes with pj_t1; result locus 
HashedWorkers(parallel)
+explain(costs off, locus)
+select count(*) from pj_t1 right join pj_t2 using (id);
+                        QUERY PLAN                        
+----------------------------------------------------------
+ Finalize Aggregate
+   Locus: Entry
+   ->  Gather Motion 6:1  (slice1; segments: 6)
+         Locus: Entry
+         ->  Partial Aggregate
+               Locus: HashedWorkers
+               Parallel Workers: 2
+               ->  Parallel Hash Right Join
+                     Locus: HashedWorkers
+                     Parallel Workers: 2
+                     Hash Cond: (pj_t1.id = pj_t2.id)
+                     ->  Parallel Seq Scan on pj_t1
+                           Locus: HashedWorkers
+                           Parallel Workers: 2
+                     ->  Parallel Hash
+                           Locus: Hashed
+                           ->  Parallel Seq Scan on pj_t2
+                                 Locus: HashedWorkers
+                                 Parallel Workers: 2
+ Optimizer: Postgres query optimizer
+(20 rows)
+
+-- correctness: parallel result matches non-parallel
+set local enable_parallel = off;
+select count(*) from pj_t1 right join pj_t2 using (id);
+ count 
+-------
+ 10000
+(1 row)
+
+set local enable_parallel = on;
+select count(*) from pj_t1 right join pj_t2 using (id);
+ count 
+-------
+ 10000
+(1 row)
+
+-- Locus propagation: HashedOJ(parallel) followed by INNER JOIN with 
Hashed(serial)
+-- The full join result (HashedOJ,parallel=2) is joined with pj_t3 
(Hashed,serial)
+explain(costs off, locus)
+select count(*) from (pj_t1 full join pj_t2 using (id)) fj inner join pj_t3 
using (id);
+                                QUERY PLAN                                 
+---------------------------------------------------------------------------
+ Finalize Aggregate
+   Locus: Entry
+   ->  Gather Motion 3:1  (slice1; segments: 3)
+         Locus: Entry
+         ->  Partial Aggregate
+               Locus: HashedOJ
+               ->  Hash Join
+                     Locus: HashedOJ
+                     Hash Cond: (COALESCE(pj_t1.id, pj_t2.id) = pj_t3.id)
+                     ->  Hash Full Join
+                           Locus: HashedOJ
+                           Hash Cond: (pj_t1.id = pj_t2.id)
+                           ->  Seq Scan on pj_t1
+                                 Locus: Hashed
+                           ->  Hash
+                                 Locus: Hashed
+                                 ->  Seq Scan on pj_t2
+                                       Locus: Hashed
+                     ->  Hash
+                           Locus: Replicated
+                           ->  Broadcast Motion 3:3  (slice2; segments: 3)
+                                 Locus: Replicated
+                                 ->  Seq Scan on pj_t3
+                                       Locus: Hashed
+ Optimizer: Postgres query optimizer
+(25 rows)
+
+-- Locus propagation: HashedOJ(parallel) followed by FULL JOIN with 
Hashed(serial)
+explain(costs off, locus)
+select count(*) from (pj_t1 full join pj_t2 using (id)) fj full join pj_t3 
using (id);
+                                QUERY PLAN                                
+--------------------------------------------------------------------------
+ Finalize Aggregate
+   Locus: Entry
+   ->  Gather Motion 3:1  (slice1; segments: 3)
+         Locus: Entry
+         ->  Partial Aggregate
+               Locus: HashedOJ
+               ->  Hash Full Join
+                     Locus: HashedOJ
+                     Hash Cond: (COALESCE(pj_t1.id, pj_t2.id) = pj_t3.id)
+                     ->  Redistribute Motion 3:3  (slice2; segments: 3)
+                           Locus: Hashed
+                           Hash Key: COALESCE(pj_t1.id, pj_t2.id)
+                           ->  Hash Full Join
+                                 Locus: HashedOJ
+                                 Hash Cond: (pj_t1.id = pj_t2.id)
+                                 ->  Seq Scan on pj_t1
+                                       Locus: Hashed
+                                 ->  Hash
+                                       Locus: Hashed
+                                       ->  Seq Scan on pj_t2
+                                             Locus: Hashed
+                     ->  Hash
+                           Locus: Hashed
+                           ->  Seq Scan on pj_t3
+                                 Locus: Hashed
+ Optimizer: Postgres query optimizer
+(26 rows)
+
+abort;
 -- start_ignore
 drop schema test_parallel cascade;
+NOTICE:  drop cascades to 6 other objects
+DETAIL:  drop cascades to table t_distinct_0
+drop cascades to table t_distinct_1
+drop cascades to table departments
+drop cascades to table employees
+drop cascades to table t1
+drop cascades to table t2
 -- end_ignore
 reset gp_appendonly_insert_files;
 reset force_parallel_mode;
diff --git a/src/test/regress/expected/join_hash.out 
b/src/test/regress/expected/join_hash.out
index 28f4558ec04..e5f74c18d28 100644
--- a/src/test/regress/expected/join_hash.out
+++ b/src/test/regress/expected/join_hash.out
@@ -10,6 +10,9 @@ set allow_system_table_mods=on;
 set local min_parallel_table_scan_size = 0;
 set local parallel_setup_cost = 0;
 set local enable_hashjoin = on;
+-- CBDB: disable CBDB parallel for these PG-originated tests; parallel full 
join
+-- is tested separately in cbdb_parallel.sql.
+set local enable_parallel = off;
 -- Extract bucket and batch counts from an explain analyze plan.  In
 -- general we can't make assertions about how many batches (or
 -- buckets) will be required because it can vary, but we can in some
@@ -58,12 +61,16 @@ $$;
 -- estimated size.
 create table simple as
   select generate_series(1, 60000) AS id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named 
'id' as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 alter table simple set (parallel_workers = 2);
 analyze simple;
 -- Make a relation whose size we will under-estimate.  We want stats
 -- to say 1000 rows, but actually there are 20,000 rows.
 create table bigger_than_it_looks as
   select generate_series(1, 60000) as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named 
'id' as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 alter table bigger_than_it_looks set (autovacuum_enabled = 'false');
 WARNING:  autovacuum is not supported in Cloudberry
 alter table bigger_than_it_looks set (parallel_workers = 2);
@@ -73,6 +80,8 @@ update pg_class set reltuples = 1000 where relname = 
'bigger_than_it_looks';
 -- kind of skew that breaks our batching scheme.  We want stats to say
 -- 2 rows, but actually there are 20,000 rows with the same key.
 create table extremely_skewed (id int, t text);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'id' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 alter table extremely_skewed set (autovacuum_enabled = 'false');
 WARNING:  autovacuum is not supported in Cloudberry
 alter table extremely_skewed set (parallel_workers = 2);
@@ -85,6 +94,8 @@ update pg_class
   where relname = 'extremely_skewed';
 -- Make a relation with a couple of enormous tuples.
 create table wide as select generate_series(1, 2) as id, rpad('', 320000, 'x') 
as t;
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named 
'id' as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 alter table wide set (parallel_workers = 2);
 ANALYZE wide;
 -- The "optimal" case: the hash table fits in memory; we plan for 1
@@ -319,7 +330,7 @@ $$);
 select count(*) from simple r full outer join simple s using (id);
  count 
 -------
- 20000
+ 60000
 (1 row)
 
 rollback to settings;
@@ -574,9 +585,13 @@ rollback to settings;
 -- Exercise rescans.  We'll turn off parallel_leader_participation so
 -- that we can check that instrumentation comes back correctly.
 create table join_foo as select generate_series(1, 3) as id, 'xxxxx'::text as 
t;
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named 
'id' as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 analyze join_foo;
 alter table join_foo set (parallel_workers = 0);
 create table join_bar as select generate_series(1, 20000) as id, 'xxxxx'::text 
as t;
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named 
'id' as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 analyze join_bar;
 alter table join_bar set (parallel_workers = 2);
 -- multi-batch with rescan, parallel-oblivious
@@ -854,23 +869,23 @@ savepoint settings;
 set local max_parallel_workers_per_gather = 2;
 explain (costs off)
      select  count(*) from simple r full outer join simple s using (id);
-                         QUERY PLAN                          
--------------------------------------------------------------
+                     QUERY PLAN                     
+----------------------------------------------------
  Finalize Aggregate
-   ->  Gather
-         Workers Planned: 2
+   ->  Gather Motion 3:1  (slice1; segments: 3)
          ->  Partial Aggregate
-               ->  Parallel Hash Full Join
+               ->  Hash Full Join
                      Hash Cond: (r.id = s.id)
-                     ->  Parallel Seq Scan on simple r
-                     ->  Parallel Hash
-                           ->  Parallel Seq Scan on simple s
+                     ->  Seq Scan on simple r
+                     ->  Hash
+                           ->  Seq Scan on simple s
+ Optimizer: Postgres query optimizer
 (9 rows)
 
 select  count(*) from simple r full outer join simple s using (id);
  count 
 -------
- 20000
+ 60000
 (1 row)
 
 rollback to settings;
@@ -935,23 +950,25 @@ savepoint settings;
 set local max_parallel_workers_per_gather = 2;
 explain (costs off)
      select  count(*) from simple r full outer join simple s on (r.id = 0 - 
s.id);
-                         QUERY PLAN                          
--------------------------------------------------------------
+                               QUERY PLAN                               
+------------------------------------------------------------------------
  Finalize Aggregate
-   ->  Gather
-         Workers Planned: 2
+   ->  Gather Motion 3:1  (slice1; segments: 3)
          ->  Partial Aggregate
-               ->  Parallel Hash Full Join
+               ->  Hash Full Join
                      Hash Cond: ((0 - s.id) = r.id)
-                     ->  Parallel Seq Scan on simple s
-                     ->  Parallel Hash
-                           ->  Parallel Seq Scan on simple r
-(9 rows)
+                     ->  Redistribute Motion 3:3  (slice2; segments: 3)
+                           Hash Key: (0 - s.id)
+                           ->  Seq Scan on simple s
+                     ->  Hash
+                           ->  Seq Scan on simple r
+ Optimizer: Postgres query optimizer
+(11 rows)
 
 select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
- count 
--------
- 40000
+ count  
+--------
+ 120000
 (1 row)
 
 rollback to settings;
@@ -1013,7 +1030,11 @@ rollback to settings;
 savepoint settings;
 set max_parallel_workers_per_gather = 0;
 create table join_hash_t_small(a int);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 create table join_hash_t_big(b int);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'b' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 insert into join_hash_t_small select i%100 from generate_series(0, 3000)i;
 insert into join_hash_t_big select i%100000 from generate_series(1, 100000)i ;
 analyze join_hash_t_small;
@@ -1031,17 +1052,25 @@ explain (costs off) select * from join_hash_t_small, 
join_hash_t_big where a = b
 (7 rows)
 
 rollback to settings;
+rollback;
 -- Hash join reuses the HOT status bit to indicate match status. This can only
 -- be guaranteed to produce correct results if all the hash join tuple match
 -- bits are reset before reuse. This is done upon loading them into the
 -- hashtable.
+begin;
 SAVEPOINT settings;
+-- CBDB: disable CBDB parallel; the serial full join match-bit test is what 
matters here.
+SET enable_parallel = off;
 SET enable_parallel_hash = on;
 SET min_parallel_table_scan_size = 0;
 SET parallel_setup_cost = 0;
 SET parallel_tuple_cost = 0;
 CREATE TABLE hjtest_matchbits_t1(id int);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'id' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 CREATE TABLE hjtest_matchbits_t2(id int);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'id' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 INSERT INTO hjtest_matchbits_t1 VALUES (1);
 INSERT INTO hjtest_matchbits_t2 VALUES (2);
 -- Update should create a HOT tuple. If this status bit isn't cleared, we won't
@@ -1064,8 +1093,8 @@ SET enable_parallel_hash = off;
 SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id 
= t2.id;
  id | id 
 ----+----
-  1 |   
     |  2
+  1 |   
 (2 rows)
 
 ROLLBACK TO settings;
@@ -1085,7 +1114,11 @@ BEGIN;
 SET LOCAL enable_sort = OFF; -- avoid mergejoins
 SET LOCAL from_collapse_limit = 1; -- allows easy changing of join order
 CREATE TABLE hjtest_1 (a text, b int, id int, c bool);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 CREATE TABLE hjtest_2 (a bool, id int, b text, c int);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
 INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 2, 1, false); -- matches
 INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 1, 2, false); -- fails id 
join condition
 INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 20, 1, false); -- fails < 50
@@ -1142,8 +1175,8 @@ WHERE
          SubPlan 2
            ->  Result
                  Output: (hjtest_1.b * 5)
+ Settings: enable_parallel = 'on', enable_sort = 'off', from_collapse_limit = 
'1', optimizer = 'off'
  Optimizer: Postgres query optimizer
- Settings: enable_sort=off, from_collapse_limit=1
 (38 rows)
 
 SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, 
hjtest_2.tableoid::regclass t2
@@ -1206,8 +1239,8 @@ WHERE
          SubPlan 3
            ->  Result
                  Output: (hjtest_2.c * 5)
+ Settings: enable_parallel = 'on', enable_sort = 'off', from_collapse_limit = 
'1', optimizer = 'off'
  Optimizer: Postgres query optimizer
- Settings: enable_sort=off, from_collapse_limit=1
 (38 rows)
 
 SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, 
hjtest_2.tableoid::regclass t2
diff --git a/src/test/regress/expected/join_hash_optimizer.out 
b/src/test/regress/expected/join_hash_optimizer.out
index 053d0ef4898..1835bfa4f31 100644
--- a/src/test/regress/expected/join_hash_optimizer.out
+++ b/src/test/regress/expected/join_hash_optimizer.out
@@ -10,6 +10,9 @@ set allow_system_table_mods=on;
 set local min_parallel_table_scan_size = 0;
 set local parallel_setup_cost = 0;
 set local enable_hashjoin = on;
+-- CBDB: disable CBDB parallel for these PG-originated tests; parallel full 
join
+-- is tested separately in cbdb_parallel.sql.
+set local enable_parallel = off;
 -- Extract bucket and batch counts from an explain analyze plan.  In
 -- general we can't make assertions about how many batches (or
 -- buckets) will be required because it can vary, but we can in some
@@ -115,7 +118,7 @@ explain (costs off)
                            ->  Redistribute Motion 3:3  (slice3; segments: 3)
                                  Hash Key: s.id
                                  ->  Seq Scan on simple s
- Optimizer: Pivotal Optimizer (GPORCA)
+ Optimizer: GPORCA
 (13 rows)
 
 select count(*) from simple r join simple s using (id);
@@ -156,7 +159,7 @@ explain (costs off)
                            ->  Redistribute Motion 3:3  (slice3; segments: 3)
                                  Hash Key: s.id
                                  ->  Seq Scan on simple s
- Optimizer: Pivotal Optimizer (GPORCA)
+ Optimizer: GPORCA
 (13 rows)
 
 select count(*) from simple r join simple s using (id);
@@ -197,7 +200,7 @@ explain (costs off)
                            ->  Redistribute Motion 3:3  (slice3; segments: 3)
                                  Hash Key: s.id
                                  ->  Seq Scan on simple s
- Optimizer: Pivotal Optimizer (GPORCA)
+ Optimizer: GPORCA
 (13 rows)
 
 select count(*) from simple r join simple s using (id);
@@ -241,7 +244,7 @@ explain (costs off)
                            ->  Redistribute Motion 3:3  (slice3; segments: 3)
                                  Hash Key: s.id
                                  ->  Seq Scan on simple s
- Optimizer: Pivotal Optimizer (GPORCA)
+ Optimizer: GPORCA
 (13 rows)
 
 select count(*) from simple r join simple s using (id);
@@ -283,7 +286,7 @@ explain (costs off)
                            ->  Redistribute Motion 3:3  (slice3; segments: 3)
                                  Hash Key: s.id
                                  ->  Seq Scan on simple s
- Optimizer: Pivotal Optimizer (GPORCA)
+ Optimizer: GPORCA
 (13 rows)
 
 select count(*) from simple r join simple s using (id);
@@ -325,7 +328,7 @@ explain (costs off)
                            ->  Redistribute Motion 3:3  (slice3; segments: 3)
                                  Hash Key: s.id
                                  ->  Seq Scan on simple s
- Optimizer: Pivotal Optimizer (GPORCA)
+ Optimizer: GPORCA
 (13 rows)
 
 select count(*) from simple r join simple s using (id);
@@ -344,6 +347,13 @@ $$);
  t                    | f
 (1 row)
 
+-- parallel full multi-batch hash join
+select count(*) from simple r full outer join simple s using (id);
+ count 
+-------
+ 60000
+(1 row)
+
 rollback to settings;
 -- The "bad" case: during execution we need to increase number of
 -- batches; in this case we plan for 1 batch, and increase at least a
@@ -356,8 +366,8 @@ set local work_mem = '128kB';
 set local statement_mem = '1000kB'; -- GPDB uses statement_mem instead of 
work_mem
 explain (costs off)
   select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id);
-                                  QUERY PLAN                                  
-------------------------------------------------------------------------------
+                                QUERY PLAN                                 
+---------------------------------------------------------------------------
  Finalize Aggregate
    ->  Gather Motion 3:1  (slice1; segments: 3)
          ->  Partial Aggregate
@@ -367,8 +377,8 @@ explain (costs off)
                      ->  Hash
                            ->  Broadcast Motion 3:3  (slice2; segments: 3)
                                  ->  Seq Scan on bigger_than_it_looks s
- Optimizer: Pivotal Optimizer (GPORCA)
-(13 rows)
+ Optimizer: GPORCA
+(10 rows)
 
 select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id);
  count 
@@ -395,8 +405,8 @@ set local statement_mem = '1000kB'; -- GPDB uses 
statement_mem instead of work_m
 set local enable_parallel_hash = off;
 explain (costs off)
   select count(*) from simple r join bigger_than_it_looks s using (id);
-                                  QUERY PLAN                                  
-------------------------------------------------------------------------------
+                                QUERY PLAN                                 
+---------------------------------------------------------------------------
  Finalize Aggregate
    ->  Gather Motion 3:1  (slice1; segments: 3)
          ->  Partial Aggregate
@@ -406,8 +416,8 @@ explain (costs off)
                      ->  Hash
                            ->  Broadcast Motion 3:3  (slice2; segments: 3)
                                  ->  Seq Scan on bigger_than_it_looks s
- Optimizer: Pivotal Optimizer (GPORCA)
-(13 rows)
+ Optimizer: GPORCA
+(10 rows)
 
 select count(*) from simple r join bigger_than_it_looks s using (id);
  count 
@@ -434,8 +444,8 @@ set local statement_mem = '1000kB'; -- GPDB uses 
statement_mem instead of work_m
 set local enable_parallel_hash = on;
 explain (costs off)
   select count(*) from simple r join bigger_than_it_looks s using (id);
-                                  QUERY PLAN                                  
-------------------------------------------------------------------------------
+                                QUERY PLAN                                 
+---------------------------------------------------------------------------
  Finalize Aggregate
    ->  Gather Motion 3:1  (slice1; segments: 3)
          ->  Partial Aggregate
@@ -445,8 +455,8 @@ explain (costs off)
                      ->  Hash
                            ->  Broadcast Motion 3:3  (slice2; segments: 3)
                                  ->  Seq Scan on bigger_than_it_looks s
- Optimizer: Pivotal Optimizer (GPORCA)
-(13 rows)
+ Optimizer: GPORCA
+(10 rows)
 
 select count(*) from simple r join bigger_than_it_looks s using (id);
  count 
@@ -490,7 +500,7 @@ HINT:  For non-partitioned tables, run analyze 
<table_name>(<column_list>). For
                      ->  Hash
                            ->  Broadcast Motion 3:3  (slice2; segments: 3)
                                  ->  Seq Scan on extremely_skewed s
- Optimizer: Pivotal Optimizer (GPORCA)
+ Optimizer: GPORCA
 (10 rows)
 
 select count(*) from simple r join extremely_skewed s using (id);
@@ -534,7 +544,7 @@ HINT:  For non-partitioned tables, run analyze 
<table_name>(<column_list>). For
                      ->  Hash
                            ->  Broadcast Motion 3:3  (slice2; segments: 3)
                                  ->  Seq Scan on extremely_skewed s
- Optimizer: Pivotal Optimizer (GPORCA)
+ Optimizer: GPORCA
 (10 rows)
 
 select count(*) from simple r join extremely_skewed s using (id);
@@ -578,7 +588,7 @@ HINT:  For non-partitioned tables, run analyze 
<table_name>(<column_list>). For
                      ->  Hash
                            ->  Broadcast Motion 3:3  (slice2; segments: 3)
                                  ->  Seq Scan on extremely_skewed s
- Optimizer: Pivotal Optimizer (GPORCA)
+ Optimizer: GPORCA
 (10 rows)
 
 select count(*) from simple r join extremely_skewed s using (id);
@@ -643,8 +653,8 @@ explain (costs off)
   select count(*) from join_foo
     left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using 
(id)) ss
     on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
-                                                 QUERY PLAN                    
                             
-------------------------------------------------------------------------------------------------------------
+                                           QUERY PLAN                          
                 
+------------------------------------------------------------------------------------------------
  Finalize Aggregate
    ->  Gather Motion 3:1  (slice1; segments: 3)
          ->  Partial Aggregate
@@ -662,7 +672,7 @@ explain (costs off)
                                              ->  Redistribute Motion 3:3  
(slice4; segments: 3)
                                                    Hash Key: b2.id
                                                    ->  Seq Scan on join_bar b2
- Optimizer: Pivotal Optimizer (GPORCA)
+ Optimizer: GPORCA
 (18 rows)
 
 select count(*) from join_foo
@@ -701,8 +711,8 @@ explain (costs off)
   select count(*) from join_foo
     left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using 
(id)) ss
     on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
-                                                 QUERY PLAN                    
                             
-------------------------------------------------------------------------------------------------------------
+                                           QUERY PLAN                          
                 
+------------------------------------------------------------------------------------------------
  Finalize Aggregate
    ->  Gather Motion 3:1  (slice1; segments: 3)
          ->  Partial Aggregate
@@ -720,7 +730,7 @@ explain (costs off)
                                              ->  Redistribute Motion 3:3  
(slice4; segments: 3)
                                                    Hash Key: b2.id
                                                    ->  Seq Scan on join_bar b2
- Optimizer: Pivotal Optimizer (GPORCA)
+ Optimizer: GPORCA
 (18 rows)
 
 select count(*) from join_foo
@@ -760,8 +770,8 @@ explain (costs off)
   select count(*) from join_foo
     left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using 
(id)) ss
     on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
-                                                 QUERY PLAN                    
                             
-------------------------------------------------------------------------------------------------------------
+                                           QUERY PLAN                          
                 
+------------------------------------------------------------------------------------------------
  Finalize Aggregate
    ->  Gather Motion 3:1  (slice1; segments: 3)
          ->  Partial Aggregate
@@ -779,7 +789,7 @@ explain (costs off)
                                              ->  Redistribute Motion 3:3  
(slice4; segments: 3)
                                                    Hash Key: b2.id
                                                    ->  Seq Scan on join_bar b2
- Optimizer: Pivotal Optimizer (GPORCA)
+ Optimizer: GPORCA
 (18 rows)
 
 select count(*) from join_foo
@@ -818,8 +828,8 @@ explain (costs off)
   select count(*) from join_foo
     left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using 
(id)) ss
     on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
-                                                 QUERY PLAN                    
                             
-------------------------------------------------------------------------------------------------------------
+                                           QUERY PLAN                          
                 
+------------------------------------------------------------------------------------------------
  Finalize Aggregate
    ->  Gather Motion 3:1  (slice1; segments: 3)
          ->  Partial Aggregate
@@ -837,7 +847,7 @@ explain (costs off)
                                              ->  Redistribute Motion 3:3  
(slice4; segments: 3)
                                                    Hash Key: b2.id
                                                    ->  Seq Scan on join_bar b2
- Optimizer: Pivotal Optimizer (GPORCA)
+ Optimizer: GPORCA
 (18 rows)
 
 select count(*) from join_foo
@@ -891,8 +901,9 @@ select  count(*) from simple r full outer join simple s 
using (id);
 (1 row)
 
 rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
 savepoint settings;
+set enable_parallel_hash = off;
 set local max_parallel_workers_per_gather = 2;
 explain (costs off)
      select  count(*) from simple r full outer join simple s using (id);
@@ -920,7 +931,36 @@ select  count(*) from simple r full outer join simple s 
using (id);
 (1 row)
 
 rollback to settings;
--- An full outer join where every record is not matched.
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+                                  QUERY PLAN                                  
+------------------------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather Motion 3:1  (slice1; segments: 3)
+         ->  Partial Aggregate
+               ->  Hash Full Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Redistribute Motion 3:3  (slice2; segments: 3)
+                           Hash Key: r.id
+                           ->  Seq Scan on simple r
+                     ->  Hash
+                           ->  Redistribute Motion 3:3  (slice3; segments: 3)
+                                 Hash Key: s.id
+                                 ->  Seq Scan on simple s
+ Optimizer: GPORCA
+(13 rows)
+
+select  count(*) from simple r full outer join simple s using (id);
+ count 
+-------
+ 60000
+(1 row)
+
+rollback to settings;
+-- A full outer join where every record is not matched.
 -- non-parallel
 savepoint settings;
 set local max_parallel_workers_per_gather = 0;
@@ -950,7 +990,37 @@ select  count(*) from simple r full outer join simple s on 
(r.id = 0 - s.id);
 (1 row)
 
 rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
+savepoint settings;
+set enable_parallel_hash = off;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - 
s.id);
+                                  QUERY PLAN                                  
+------------------------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather Motion 3:1  (slice1; segments: 3)
+         ->  Partial Aggregate
+               ->  Hash Full Join
+                     Hash Cond: (r.id = (0 - s.id))
+                     ->  Redistribute Motion 3:3  (slice2; segments: 3)
+                           Hash Key: r.id
+                           ->  Seq Scan on simple r
+                     ->  Hash
+                           ->  Redistribute Motion 3:3  (slice3; segments: 3)
+                                 Hash Key: (0 - s.id)
+                                 ->  Seq Scan on simple s
+ Optimizer: GPORCA
+(13 rows)
+
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ count  
+--------
+ 120000
+(1 row)
+
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
 savepoint settings;
 set local max_parallel_workers_per_gather = 2;
 explain (costs off)
@@ -1012,7 +1082,7 @@ explain (costs off)
                            ->  Redistribute Motion 3:3  (slice3; segments: 3)
                                  Hash Key: wide_1.id
                                  ->  Seq Scan on wide wide_1
- Optimizer: Pivotal Optimizer (GPORCA)
+ Optimizer: GPORCA
 (13 rows)
 
 select length(max(s.t))
@@ -1060,11 +1130,57 @@ explain (costs off) select * from join_hash_t_small, 
join_hash_t_big where a = b
          ->  Seq Scan on join_hash_t_big
          ->  Hash
                ->  Seq Scan on join_hash_t_small
- Optimizer: Pivotal Optimizer (GPORCA)
+ Optimizer: GPORCA
 (7 rows)
 
 rollback to settings;
 rollback;
+-- Hash join reuses the HOT status bit to indicate match status. This can only
+-- be guaranteed to produce correct results if all the hash join tuple match
+-- bits are reset before reuse. This is done upon loading them into the
+-- hashtable.
+begin;
+SAVEPOINT settings;
+-- CBDB: disable CBDB parallel; the serial full join match-bit test is what 
matters here.
+SET enable_parallel = off;
+SET enable_parallel_hash = on;
+SET min_parallel_table_scan_size = 0;
+SET parallel_setup_cost = 0;
+SET parallel_tuple_cost = 0;
+CREATE TABLE hjtest_matchbits_t1(id int);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'id' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
+CREATE TABLE hjtest_matchbits_t2(id int);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'id' 
as the Apache Cloudberry data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
+INSERT INTO hjtest_matchbits_t1 VALUES (1);
+INSERT INTO hjtest_matchbits_t2 VALUES (2);
+-- Update should create a HOT tuple. If this status bit isn't cleared, we won't
+-- correctly emit the NULL-extended unmatching tuple in full hash join.
+UPDATE hjtest_matchbits_t2 set id = 2;
+SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id 
= t2.id
+  ORDER BY t1.id;
+ id | id 
+----+----
+  1 |   
+    |  2
+(2 rows)
+
+-- Test serial full hash join.
+-- Resetting parallel_setup_cost should force a serial plan.
+-- Just to be safe, however, set enable_parallel_hash to off, as parallel full
+-- hash joins are only supported with shared hashtables.
+RESET parallel_setup_cost;
+SET enable_parallel_hash = off;
+SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id 
= t2.id;
+ id | id 
+----+----
+    |  2
+  1 |   
+(2 rows)
+
+ROLLBACK TO settings;
+rollback;
 -- Verify that hash key expressions reference the correct
 -- nodes. Hashjoin's hashkeys need to reference its outer plan, Hash's
 -- need to reference Hash's outer plan (which is below HashJoin's
@@ -1154,9 +1270,9 @@ WHERE
                                    Filter: (((hjtest_1.b * 5)) < 50)
                                    ->  Result
                                          Output: (hjtest_1.b * 5)
- Settings: enable_sort = 'off', from_collapse_limit = '1'
- Optimizer: Pivotal Optimizer (GPORCA)
-(49 rows)
+ Settings: enable_parallel = 'on', enable_sort = 'off', from_collapse_limit = 
'1', optimizer = 'on'
+ Optimizer: GPORCA
+(51 rows)
 
 SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, 
hjtest_2.tableoid::regclass t2
 FROM hjtest_1, hjtest_2
@@ -1231,9 +1347,9 @@ WHERE
                                    Filter: (((hjtest_1.b * 5)) < 50)
                                    ->  Result
                                          Output: (hjtest_1.b * 5)
- Settings: enable_sort = 'off', from_collapse_limit = '1'
- Optimizer: Pivotal Optimizer (GPORCA)
-(49 rows)
+ Settings: enable_parallel = 'on', enable_sort = 'off', from_collapse_limit = 
'1', optimizer = 'on'
+ Optimizer: GPORCA
+(51 rows)
 
 SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, 
hjtest_2.tableoid::regclass t2
 FROM hjtest_2, hjtest_1
diff --git a/src/test/regress/sql/cbdb_parallel.sql 
b/src/test/regress/sql/cbdb_parallel.sql
index f9d01dd8a00..08e7aa198f9 100644
--- a/src/test/regress/sql/cbdb_parallel.sql
+++ b/src/test/regress/sql/cbdb_parallel.sql
@@ -1149,6 +1149,56 @@ reset gp_cte_sharing;
 reset enable_parallel;
 reset min_parallel_table_scan_size;
 
+--
+-- Parallel Hash Full/Right Join
+--
+begin;
+create table pj_t1(id int, v int) with(parallel_workers=2) distributed by (id);
+create table pj_t2(id int, v int) with(parallel_workers=2) distributed by (id);
+create table pj_t3(id int, v int) with(parallel_workers=0) distributed by (id);
+
+-- pj_t1 is 3x larger than pj_t2 so the planner hashes the smaller pj_t2
+-- and probes with pj_t1, producing a genuine Parallel Hash Right Join plan.
+insert into pj_t1 select i, i from generate_series(1,30000)i;
+insert into pj_t2 select i, i from generate_series(25001,35000)i;
+insert into pj_t3 select i, i from generate_series(1,10000)i;
+analyze pj_t1;
+analyze pj_t2;
+analyze pj_t3;
+
+set local enable_parallel = on;
+set local min_parallel_table_scan_size = 0;
+
+-- 12_P_12_10: Parallel Hash Full Join: HashedWorkers FULL JOIN HashedWorkers 
-> HashedOJ(parallel)
+explain(costs off, locus)
+select count(*) from pj_t1 full join pj_t2 using (id);
+-- correctness: parallel result matches non-parallel
+set local enable_parallel = off;
+select count(*) from pj_t1 full join pj_t2 using (id);
+set local enable_parallel = on;
+select count(*) from pj_t1 full join pj_t2 using (id);
+
+-- Parallel Hash Right Join: pj_t1 (30K) is larger, so the planner hashes the 
smaller pj_t2
+-- (10K) as the build side and probes with pj_t1; result locus 
HashedWorkers(parallel)
+explain(costs off, locus)
+select count(*) from pj_t1 right join pj_t2 using (id);
+-- correctness: parallel result matches non-parallel
+set local enable_parallel = off;
+select count(*) from pj_t1 right join pj_t2 using (id);
+set local enable_parallel = on;
+select count(*) from pj_t1 right join pj_t2 using (id);
+
+-- Locus propagation: HashedOJ(parallel) followed by INNER JOIN with 
Hashed(serial)
+-- The full join result (HashedOJ,parallel=2) is joined with pj_t3 
(Hashed,serial)
+explain(costs off, locus)
+select count(*) from (pj_t1 full join pj_t2 using (id)) fj inner join pj_t3 
using (id);
+
+-- Locus propagation: HashedOJ(parallel) followed by FULL JOIN with 
Hashed(serial)
+explain(costs off, locus)
+select count(*) from (pj_t1 full join pj_t2 using (id)) fj full join pj_t3 
using (id);
+
+abort;
+
 -- start_ignore
 drop schema test_parallel cascade;
 -- end_ignore
diff --git a/src/test/regress/sql/join_hash.sql 
b/src/test/regress/sql/join_hash.sql
index 0115489a6b9..2978e155ecd 100644
--- a/src/test/regress/sql/join_hash.sql
+++ b/src/test/regress/sql/join_hash.sql
@@ -13,6 +13,9 @@ set allow_system_table_mods=on;
 set local min_parallel_table_scan_size = 0;
 set local parallel_setup_cost = 0;
 set local enable_hashjoin = on;
+-- CBDB: disable CBDB parallel for these PG-originated tests; parallel full 
join
+-- is tested separately in cbdb_parallel.sql.
+set local enable_parallel = off;
 
 -- Extract bucket and batch counts from an explain analyze plan.  In
 -- general we can't make assertions about how many batches (or
@@ -543,7 +546,10 @@ rollback;
 -- be guaranteed to produce correct results if all the hash join tuple match
 -- bits are reset before reuse. This is done upon loading them into the
 -- hashtable.
+begin;
 SAVEPOINT settings;
+-- CBDB: disable CBDB parallel; the serial full join match-bit test is what 
matters here.
+SET enable_parallel = off;
 SET enable_parallel_hash = on;
 SET min_parallel_table_scan_size = 0;
 SET parallel_setup_cost = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to