This is an automated email from the ASF dual-hosted git repository. avamingli pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/cloudberry.git
commit d971e822804ebb121ed164fc4c72a862ed33e8f7 Author: Zhang Mingli <[email protected]> AuthorDate: Tue Mar 24 16:33:27 2026 +0800 tests: add Parallel Hash Full/Right Join regression cases cbdb_parallel.sql: add a new test block covering: - Parallel Hash Full Join (HashedWorkers FULL JOIN HashedWorkers produces HashedOJ with parallel_workers=2) - Parallel Hash Right Join (pj_t1 is 3x larger than pj_t2, so the planner hashes the smaller pj_t2 and probes with pj_t1; result locus HashedWorkers) - Correctness checks: count(*) matches serial execution - Locus propagation: HashedOJ(parallel) followed by INNER JOIN produces HashedOJ; followed by FULL JOIN produces HashedOJ join_hash.sql/out: CBDB-specific adaptations for the upstream parallel full join test -- disable parallel mode for tests that require serial plans, fix SAVEPOINT inside a parallel worker context, and update expected output to match CBDB plan shapes. --- contrib/pax_storage/expected/cbdb_parallel.out | 183 +++++++---- src/test/regress/expected/cbdb_parallel.out | 370 ++++++++++++++++------ src/test/regress/expected/join_hash.out | 83 +++-- src/test/regress/expected/join_hash_optimizer.out | 204 +++++++++--- src/test/regress/sql/cbdb_parallel.sql | 50 +++ src/test/regress/sql/join_hash.sql | 6 + 6 files changed, 668 insertions(+), 228 deletions(-) diff --git a/contrib/pax_storage/expected/cbdb_parallel.out b/contrib/pax_storage/expected/cbdb_parallel.out index db583090026..ec6ceba7e3c 100644 --- a/contrib/pax_storage/expected/cbdb_parallel.out +++ b/contrib/pax_storage/expected/cbdb_parallel.out @@ -41,13 +41,29 @@ set gp_appendonly_insert_files = 4; begin; set local enable_parallel = on; create table test_131_ao1(x int, y int) using ao_row with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table test_131_ao2(x int, y int) using ao_row with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table test_131_ao3(x int, y int) using ao_row with(parallel_workers=0); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table test_131_ao4(x int, y int) using ao_row with(parallel_workers=0); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table test_131_aoco1(x int, y int) using ao_column with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table test_131_aoco2(x int, y int) using ao_column with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table test_131_aoco3(x int, y int) using ao_column with(parallel_workers=0); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table test_131_aoco4(x int, y int) using ao_column with(parallel_workers=0); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. select relname, reloptions from pg_catalog.pg_class where relname like 'test_131_ao%'; relname | reloptions ----------------+---------------------- @@ -155,8 +171,14 @@ explain(locus, costs off) select count(*) from test_131_aoco3, test_131_aoco4 wh abort; create table ao1(x int, y int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table ao2(x int, y int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table aocs1(x int, y int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. begin; -- encourage use of parallel plans set local min_parallel_table_scan_size = 0; @@ -367,6 +389,8 @@ abort; begin; set local max_parallel_workers_per_gather = 2; create table t1(a int, b int) with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table rt1(a int, b int) with(parallel_workers=2) distributed replicated; create table rt2(a int, b int) distributed replicated; create table rt3(a int, b int) distributed replicated; @@ -599,6 +623,8 @@ select * from rt1 join t1 on rt1.a = t1.b join rt2 on rt2.a = t1.b; 5 | 6 | 4 | 5 | 5 | 6 8 | 9 | 7 | 8 | 8 | 9 9 | 10 | 8 | 9 | 9 | 10 + 1 | 2 | 1 | 1 | 1 | 2 + 2 | 3 | 1 | 2 | 2 | 3 5 | 6 | 5 | 5 | 5 | 6 6 | 7 | 6 | 6 | 6 | 7 9 | 10 | 9 | 9 | 9 | 10 @@ -606,8 +632,6 @@ select * from rt1 join t1 on rt1.a = t1.b join rt2 on rt2.a = t1.b; 6 | 7 | 5 | 6 | 6 | 7 7 | 8 | 6 | 7 | 7 | 8 10 | 11 | 9 | 10 | 10 | 11 - 1 | 2 | 1 | 1 | 1 | 2 - 2 | 3 | 1 | 2 | 2 | 3 (19 rows) -- parallel hash join @@ -650,13 +674,6 @@ explain(locus, costs off) select * from rt1 join t1 on rt1.a = t1.b join rt2 on select * from rt1 join t1 on rt1.a = t1.b join rt2 on rt2.a = t1.b; a | b | a | b | a | b ----+----+----+----+----+---- - 5 | 6 | 5 | 5 | 5 | 6 - 6 | 7 | 5 | 6 | 6 | 7 - 6 | 7 | 6 | 6 | 6 | 7 - 7 | 8 | 6 | 7 | 7 | 8 - 9 | 10 | 9 | 9 | 9 | 10 - 10 | 11 | 9 | 10 | 10 | 11 - 10 | 11 | 10 | 10 | 10 | 11 2 | 3 | 2 | 2 | 2 | 3 3 | 4 | 2 | 3 | 3 | 4 3 | 4 | 3 | 3 | 3 | 4 @@ -669,6 +686,13 @@ select * from rt1 join t1 on rt1.a = t1.b join rt2 on rt2.a = t1.b; 9 | 10 | 8 | 9 | 9 | 10 1 | 2 | 1 | 1 | 1 | 2 2 | 3 | 1 | 2 | 2 | 3 + 5 | 6 | 5 | 5 | 5 | 6 + 6 | 7 | 5 | 6 | 6 | 7 + 6 | 7 | 6 | 6 | 6 | 7 + 7 | 8 | 6 | 7 | 7 | 8 + 9 | 10 | 9 | 9 | 9 | 10 + 10 | 11 | 9 | 10 | 10 | 11 + 10 | 11 | 10 | 10 | 10 | 11 (19 rows) -- @@ -702,6 +726,8 @@ explain(locus, costs off) select * from rt1 join t1 on rt1.a = t1.b join rt3 on select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a = t1.b; a | b | a | b | a | b ----+----+----+----+----+---- + 1 | 2 | 1 | 1 | 1 | 2 + 2 | 3 | 1 | 2 | 2 | 3 2 | 3 | 2 | 2 | 2 | 3 3 | 4 | 3 | 3 | 3 | 4 4 | 5 | 4 | 4 | 4 | 5 @@ -712,8 +738,6 @@ select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a = t1.b; 5 | 6 | 4 | 5 | 5 | 6 8 | 9 | 7 | 8 | 8 | 9 9 | 10 | 8 | 9 | 9 | 10 - 1 | 2 | 1 | 1 | 1 | 2 - 2 | 3 | 1 | 2 | 2 | 3 5 | 6 | 5 | 5 | 5 | 6 6 | 7 | 6 | 6 | 6 | 7 9 | 10 | 9 | 9 | 9 | 10 @@ -779,6 +803,8 @@ select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a = t1.b; (19 rows) create table t2(a int, b int) with(parallel_workers=0); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table rt4(a int, b int) with(parallel_workers=2) distributed replicated; insert into t2 select i, i+1 from generate_series(1, 10) i; insert into rt4 select i, i+1 from generate_series(1, 10000) i; @@ -788,16 +814,16 @@ set local enable_parallel = off; select * from rt4 join t2 using(b); b | a | a ----+----+---- - 2 | 1 | 1 - 6 | 5 | 5 - 7 | 6 | 6 - 10 | 9 | 9 - 11 | 10 | 10 3 | 2 | 2 4 | 3 | 3 5 | 4 | 4 8 | 7 | 7 9 | 8 | 8 + 2 | 1 | 1 + 6 | 5 | 5 + 7 | 6 | 6 + 10 | 9 | 9 + 11 | 10 | 10 (10 rows) set local enable_parallel = on; @@ -828,19 +854,21 @@ explain(locus, costs off) select * from rt4 join t2 using(b); select * from rt4 join t2 using(b); b | a | a ----+----+---- - 2 | 1 | 1 + 6 | 5 | 5 + 7 | 6 | 6 + 10 | 9 | 9 + 11 | 10 | 10 3 | 2 | 2 4 | 3 | 3 5 | 4 | 4 8 | 7 | 7 9 | 8 | 8 - 6 | 5 | 5 - 7 | 6 | 6 - 10 | 9 | 9 - 11 | 10 | 10 + 2 | 1 | 1 (10 rows) create table t3(a int, b int) with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. insert into t3 select i, i+1 from generate_series(1, 9000) i; analyze t3; set local enable_parallel = off; @@ -919,10 +947,10 @@ explain(locus, costs off) select * from t_replica_workers_2 join t_random_worker select * from t_replica_workers_2 join t_random_workers_0 using(a); a | b | b ---+---+--- - 2 | 3 | 3 - 3 | 4 | 4 1 | 2 | 2 + 2 | 3 | 3 4 | 5 | 5 + 3 | 4 | 4 5 | 6 | 6 (5 rows) @@ -931,11 +959,11 @@ set local enable_parallel=false; select * from t_replica_workers_2 join t_random_workers_0 using(a); a | b | b ---+---+--- - 2 | 3 | 3 3 | 4 | 4 - 1 | 2 | 2 - 4 | 5 | 5 5 | 6 | 6 + 4 | 5 | 5 + 1 | 2 | 2 + 2 | 3 | 3 (5 rows) abort; @@ -976,11 +1004,11 @@ explain(locus, costs off) select * from t_replica_workers_2 right join t_random_ select * from t_replica_workers_2 right join t_random_workers_2 using(a); a | b | b ---+---+--- - 5 | 6 | 6 1 | 2 | 2 2 | 3 | 3 3 | 4 | 4 4 | 5 | 5 + 5 | 6 | 6 (5 rows) -- non parallel results @@ -1028,14 +1056,14 @@ explain(locus, costs off) select * from t_replica_workers_2 join t_random_worker Locus: Strewn Parallel Workers: 2 Optimizer: Postgres query optimizer -(16 rows) +(15 rows) select * from t_replica_workers_2 join t_random_workers_2 using(a); a | b | b ---+---+--- - 2 | 3 | 3 1 | 2 | 2 3 | 4 | 4 + 2 | 3 | 3 4 | 5 | 5 5 | 6 | 6 (5 rows) @@ -1045,9 +1073,9 @@ set local enable_parallel=false; select * from t_replica_workers_2 join t_random_workers_2 using(a); a | b | b ---+---+--- - 2 | 3 | 3 1 | 2 | 2 3 | 4 | 4 + 2 | 3 | 3 4 | 5 | 5 5 | 6 | 6 (5 rows) @@ -1059,7 +1087,11 @@ abort; -- begin; create table t1(a int, b int) with(parallel_workers=3); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table t2(b int, a int) with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'b' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. insert into t1 select i, i+1 from generate_series(1, 10) i; insert into t2 select i, i+1 from generate_series(1, 5) i; analyze t1; @@ -1071,17 +1103,17 @@ explain(costs off) select * from t1 right join t2 on t1.b = t2.a; QUERY PLAN ------------------------------------------------------------------ Gather Motion 9:1 (slice1; segments: 9) - -> Parallel Hash Left Join - Hash Cond: (t2.a = t1.b) - -> Redistribute Motion 6:9 (slice2; segments: 6) - Hash Key: t2.a + -> Parallel Hash Right Join + Hash Cond: (t1.b = t2.a) + -> Redistribute Motion 9:9 (slice2; segments: 9) + Hash Key: t1.b Hash Module: 3 - -> Parallel Seq Scan on t2 + -> Parallel Seq Scan on t1 -> Parallel Hash - -> Redistribute Motion 9:9 (slice3; segments: 9) - Hash Key: t1.b + -> Redistribute Motion 6:9 (slice3; segments: 6) + Hash Key: t2.a Hash Module: 3 - -> Parallel Seq Scan on t1 + -> Parallel Seq Scan on t2 Optimizer: Postgres query optimizer (13 rows) @@ -1091,7 +1123,11 @@ abort; -- begin; create table t1(a int, b int) with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table t2(a int, b int) with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. insert into t1 select i%10, i from generate_series(1, 5) i; insert into t1 values (100000); insert into t2 select i%10, i from generate_series(1, 100000) i; @@ -1100,34 +1136,34 @@ analyze t2; set local enable_parallel = on; -- parallel hash join with shared table, SinglQE as outer partial path. explain(locus, costs off) select * from (select count(*) as a from t2) t2 left join t1 on t1.a = t2.a; - QUERY PLAN ------------------------------------------------------------------- + QUERY PLAN +------------------------------------------------------------------------ Gather Motion 6:1 (slice1; segments: 6) Locus: Entry - -> Parallel Hash Left Join - Locus: Hashed + -> Parallel Hash Right Join + Locus: HashedWorkers Parallel Workers: 2 - Hash Cond: ((count(*)) = t1.a) - -> Redistribute Motion 1:6 (slice2; segments: 1) - Locus: Hashed + Hash Cond: (t1.a = (count(*))) + -> Parallel Seq Scan on t1 + Locus: HashedWorkers Parallel Workers: 2 - Hash Key: (count(*)) - Hash Module: 3 - -> Finalize Aggregate - Locus: SingleQE - -> Gather Motion 6:1 (slice3; segments: 6) - Locus: SingleQE - -> Partial Aggregate - Locus: HashedWorkers - Parallel Workers: 2 - -> Parallel Seq Scan on t2 - Locus: HashedWorkers - Parallel Workers: 2 -> Parallel Hash Locus: Hashed - -> Parallel Seq Scan on t1 - Locus: HashedWorkers + -> Redistribute Motion 1:6 (slice2; segments: 1) + Locus: Hashed Parallel Workers: 2 + Hash Key: (count(*)) + Hash Module: 3 + -> Finalize Aggregate + Locus: SingleQE + -> Gather Motion 6:1 (slice3; segments: 6) + Locus: SingleQE + -> Partial Aggregate + Locus: HashedWorkers + Parallel Workers: 2 + -> Parallel Seq Scan on t2 + Locus: HashedWorkers + Parallel Workers: 2 Optimizer: Postgres query optimizer (27 rows) @@ -1323,12 +1359,18 @@ begin; create table rt1(a int, b int) distributed replicated; create table rt2(a int, b int) with (parallel_workers = 0) distributed replicated; create table t1(a int, b int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table t2(a int, b int) with (parallel_workers = 0); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. insert into t1 select i, i+1 from generate_series(1, 10000) i; insert into t2 select i, i+1 from generate_series(1, 10000) i; insert into rt1 select i, i+1 from generate_series(1, 10000) i; insert into rt2 select i, i+1 from generate_series(1, 10000) i; CREATE TABLE sq1 AS SELECT a, b FROM t1 WHERE gp_segment_id = 0; +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. set local optimizer=off; set local enable_parallel=on; set local min_parallel_table_scan_size to 0; @@ -1385,7 +1427,7 @@ explain (locus, costs off) select * from rt1 union all select * from t1; -> Result Locus: Strewn Parallel Workers: 2 - One-Time Filter: (gp_execution_segment() = 1) + One-Time Filter: (gp_execution_segment() = 0) -> Parallel Seq Scan on rt1 Locus: SegmentGeneralWorkers Parallel Workers: 2 @@ -1409,7 +1451,7 @@ explain (locus, costs off) select * from rt1 union all select * from t2; -> Result Locus: Strewn Parallel Workers: 2 - One-Time Filter: (gp_execution_segment() = 1) + One-Time Filter: (gp_execution_segment() = 0) -> Parallel Seq Scan on rt1 Locus: SegmentGeneralWorkers Parallel Workers: 2 @@ -1482,6 +1524,8 @@ abort; -- begin; create table t1(c1 int, c2 int) with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'c1' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. insert into t1 select i, i+1 from generate_series(1, 100000) i; analyze t1; set local optimizer = off; @@ -1549,6 +1593,8 @@ abort; -- begin; create table t1(c1 int, c2 int) with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'c1' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. insert into t1 select i, i+1 from generate_series(1, 100000) i; analyze t1; set local optimizer = off; @@ -1768,6 +1814,8 @@ set local optimizer = off; set local enable_parallel = on; -- ao table create table ao (a INT, b INT) using ao_row; +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. insert into ao select i as a, i as b from generate_series(1, 100) AS i; alter table ao set (parallel_workers = 2); explain(costs off) select count(*) from ao; @@ -1789,6 +1837,8 @@ select count(*) from ao; alter table ao reset (parallel_workers); -- aocs table create table aocs (a INT, b INT) using ao_column; +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. insert into aocs select i as a, i as b from generate_series(1, 100) AS i; alter table aocs set (parallel_workers = 2); explain(costs off) select count(*) from aocs; @@ -1862,9 +1912,14 @@ select * from abort; begin; create table pagg_tab (a int, b int, c text, d int) partition by list(c); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table pagg_tab_p1 partition of pagg_tab for values in ('0000', '0001', '0002', '0003', '0004'); +NOTICE: table has parent, setting distribution columns to match parent table create table pagg_tab_p2 partition of pagg_tab for values in ('0005', '0006', '0007', '0008'); +NOTICE: table has parent, setting distribution columns to match parent table create table pagg_tab_p3 partition of pagg_tab for values in ('0009', '0010', '0011'); +NOTICE: table has parent, setting distribution columns to match parent table insert into pagg_tab select i % 20, i % 30, to_char(i % 12, 'FM0000'), i % 30 from generate_series(0, 2999) i; analyze pagg_tab; set local enable_parallel to off; @@ -1939,7 +1994,11 @@ abort; -- begin; create table t1(a int, b int) with(parallel_workers=3); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table t2(b int, a int) with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'b' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. insert into t1 select i, i+1 from generate_series(1, 10) i; insert into t2 select i, i+1 from generate_series(1, 5) i; analyze t1; @@ -2329,6 +2388,8 @@ abort; -- prepare, execute locus is null begin; create table t1(c1 int, c2 int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'c1' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. analyze t1; prepare t1_count(integer) as select count(*) from t1; explain(locus, costs off) execute t1_count(1); diff --git a/src/test/regress/expected/cbdb_parallel.out b/src/test/regress/expected/cbdb_parallel.out index 35e90eebfa1..af975de50f4 100644 --- a/src/test/regress/expected/cbdb_parallel.out +++ b/src/test/regress/expected/cbdb_parallel.out @@ -112,8 +112,8 @@ set local enable_parallel_dedup_semi_reverse_join = on; set local enable_parallel_dedup_semi_join = on; explain (costs off) select sum(foo.a) from foo where exists (select 1 from bar where foo.a = bar.b); - QUERY PLAN ------------------------------------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------------------------------- Finalize Aggregate -> Gather Motion 6:1 (slice1; segments: 6) -> Partial Aggregate @@ -1032,6 +1032,15 @@ explain(locus, costs off) select * from rt1 join t1 on rt1.a = t1.b join rt2 on select * from rt1 join t1 on rt1.a = t1.b join rt2 on rt2.a = t1.b; a | b | a | b | a | b ----+----+----+----+----+---- + 1 | 2 | 1 | 1 | 1 | 2 + 2 | 3 | 1 | 2 | 2 | 3 + 5 | 6 | 5 | 5 | 5 | 6 + 6 | 7 | 6 | 6 | 6 | 7 + 9 | 10 | 9 | 9 | 9 | 10 + 10 | 11 | 10 | 10 | 10 | 11 + 6 | 7 | 5 | 6 | 6 | 7 + 7 | 8 | 6 | 7 | 7 | 8 + 10 | 11 | 9 | 10 | 10 | 11 2 | 3 | 2 | 2 | 2 | 3 3 | 4 | 3 | 3 | 3 | 4 4 | 5 | 4 | 4 | 4 | 5 @@ -1042,15 +1051,6 @@ select * from rt1 join t1 on rt1.a = t1.b join rt2 on rt2.a = t1.b; 5 | 6 | 4 | 5 | 5 | 6 8 | 9 | 7 | 8 | 8 | 9 9 | 10 | 8 | 9 | 9 | 10 - 5 | 6 | 5 | 5 | 5 | 6 - 6 | 7 | 6 | 6 | 6 | 7 - 9 | 10 | 9 | 9 | 9 | 10 - 10 | 11 | 10 | 10 | 10 | 11 - 6 | 7 | 5 | 6 | 6 | 7 - 7 | 8 | 6 | 7 | 7 | 8 - 10 | 11 | 9 | 10 | 10 | 11 - 1 | 2 | 1 | 1 | 1 | 2 - 2 | 3 | 1 | 2 | 2 | 3 (19 rows) -- parallel hash join @@ -1093,13 +1093,8 @@ explain(locus, costs off) select * from rt1 join t1 on rt1.a = t1.b join rt2 on select * from rt1 join t1 on rt1.a = t1.b join rt2 on rt2.a = t1.b; a | b | a | b | a | b ----+----+----+----+----+---- - 5 | 6 | 5 | 5 | 5 | 6 - 6 | 7 | 5 | 6 | 6 | 7 - 6 | 7 | 6 | 6 | 6 | 7 - 7 | 8 | 6 | 7 | 7 | 8 - 9 | 10 | 9 | 9 | 9 | 10 - 10 | 11 | 9 | 10 | 10 | 11 - 10 | 11 | 10 | 10 | 10 | 11 + 1 | 2 | 1 | 1 | 1 | 2 + 2 | 3 | 1 | 2 | 2 | 3 2 | 3 | 2 | 2 | 2 | 3 3 | 4 | 2 | 3 | 3 | 4 3 | 4 | 3 | 3 | 3 | 4 @@ -1110,8 +1105,13 @@ select * from rt1 join t1 on rt1.a = t1.b join rt2 on rt2.a = t1.b; 8 | 9 | 7 | 8 | 8 | 9 8 | 9 | 8 | 8 | 8 | 9 9 | 10 | 8 | 9 | 9 | 10 - 1 | 2 | 1 | 1 | 1 | 2 - 2 | 3 | 1 | 2 | 2 | 3 + 5 | 6 | 5 | 5 | 5 | 6 + 6 | 7 | 5 | 6 | 6 | 7 + 6 | 7 | 6 | 6 | 6 | 7 + 7 | 8 | 6 | 7 | 7 | 8 + 9 | 10 | 9 | 9 | 9 | 10 + 10 | 11 | 9 | 10 | 10 | 11 + 10 | 11 | 10 | 10 | 10 | 11 (19 rows) -- @@ -1145,6 +1145,8 @@ explain(locus, costs off) select * from rt1 join t1 on rt1.a = t1.b join rt3 on select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a = t1.b; a | b | a | b | a | b ----+----+----+----+----+---- + 1 | 2 | 1 | 1 | 1 | 2 + 2 | 3 | 1 | 2 | 2 | 3 2 | 3 | 2 | 2 | 2 | 3 3 | 4 | 3 | 3 | 3 | 4 4 | 5 | 4 | 4 | 4 | 5 @@ -1155,8 +1157,6 @@ select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a = t1.b; 5 | 6 | 4 | 5 | 5 | 6 8 | 9 | 7 | 8 | 8 | 9 9 | 10 | 8 | 9 | 9 | 10 - 1 | 2 | 1 | 1 | 1 | 2 - 2 | 3 | 1 | 2 | 2 | 3 5 | 6 | 5 | 5 | 5 | 6 6 | 7 | 6 | 6 | 6 | 7 9 | 10 | 9 | 9 | 9 | 10 @@ -1201,14 +1201,11 @@ select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a = t1.b; a | b | a | b | a | b ----+----+----+----+----+---- 1 | 2 | 1 | 1 | 1 | 2 - 2 | 3 | 1 | 2 | 2 | 3 5 | 6 | 5 | 5 | 5 | 6 6 | 7 | 6 | 6 | 6 | 7 9 | 10 | 9 | 9 | 9 | 10 10 | 11 | 10 | 10 | 10 | 11 - 6 | 7 | 5 | 6 | 6 | 7 - 7 | 8 | 6 | 7 | 7 | 8 - 10 | 11 | 9 | 10 | 10 | 11 + 2 | 3 | 1 | 2 | 2 | 3 2 | 3 | 2 | 2 | 2 | 3 3 | 4 | 3 | 3 | 3 | 4 4 | 5 | 4 | 4 | 4 | 5 @@ -1219,6 +1216,9 @@ select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a = t1.b; 5 | 6 | 4 | 5 | 5 | 6 8 | 9 | 7 | 8 | 8 | 9 9 | 10 | 8 | 9 | 9 | 10 + 6 | 7 | 5 | 6 | 6 | 7 + 7 | 8 | 6 | 7 | 7 | 8 + 10 | 11 | 9 | 10 | 10 | 11 (19 rows) create table t2(a int, b int) with(parallel_workers=0); @@ -1271,12 +1271,12 @@ explain(locus, costs off) select * from rt4 join t2 using(b); select * from rt4 join t2 using(b); b | a | a ----+----+---- - 2 | 1 | 1 3 | 2 | 2 4 | 3 | 3 5 | 4 | 4 8 | 7 | 7 9 | 8 | 8 + 2 | 1 | 1 6 | 5 | 5 7 | 6 | 6 10 | 9 | 9 @@ -1362,9 +1362,9 @@ explain(locus, costs off) select * from t_replica_workers_2 join t_random_worker select * from t_replica_workers_2 join t_random_workers_0 using(a); a | b | b ---+---+--- - 2 | 3 | 3 - 3 | 4 | 4 1 | 2 | 2 + 3 | 4 | 4 + 2 | 3 | 3 4 | 5 | 5 5 | 6 | 6 (5 rows) @@ -1374,9 +1374,9 @@ set local enable_parallel=false; select * from t_replica_workers_2 join t_random_workers_0 using(a); a | b | b ---+---+--- - 2 | 3 | 3 - 3 | 4 | 4 1 | 2 | 2 + 3 | 4 | 4 + 2 | 3 | 3 4 | 5 | 5 5 | 6 | 6 (5 rows) @@ -1419,9 +1419,9 @@ explain(locus, costs off) select * from t_replica_workers_2 right join t_random_ select * from t_replica_workers_2 right join t_random_workers_2 using(a); a | b | b ---+---+--- + 2 | 3 | 3 5 | 6 | 6 1 | 2 | 2 - 2 | 3 | 3 3 | 4 | 4 4 | 5 | 5 (5 rows) @@ -1431,11 +1431,11 @@ set local enable_parallel=false; select * from t_replica_workers_2 right join t_random_workers_2 using(a); a | b | b ---+---+--- + 5 | 6 | 6 1 | 2 | 2 - 2 | 3 | 3 3 | 4 | 4 4 | 5 | 5 - 5 | 6 | 6 + 2 | 3 | 3 (5 rows) abort; @@ -1471,13 +1471,13 @@ explain(locus, costs off) select * from t_replica_workers_2 join t_random_worker Locus: Strewn Parallel Workers: 2 Optimizer: Postgres query optimizer -(16 rows) +(15 rows) select * from t_replica_workers_2 join t_random_workers_2 using(a); a | b | b ---+---+--- - 2 | 3 | 3 1 | 2 | 2 + 2 | 3 | 3 3 | 4 | 4 4 | 5 | 5 5 | 6 | 6 @@ -1488,11 +1488,11 @@ set local enable_parallel=false; select * from t_replica_workers_2 join t_random_workers_2 using(a); a | b | b ---+---+--- - 2 | 3 | 3 - 1 | 2 | 2 3 | 4 | 4 4 | 5 | 5 5 | 6 | 6 + 1 | 2 | 2 + 2 | 3 | 3 (5 rows) abort; @@ -1510,28 +1510,28 @@ analyze t1; analyze rt1; set local enable_parallel = on; explain(locus, costs off) select * from (select count(*) as a from t1) t1 left join rt1 on rt1.a = t1.a; - QUERY PLAN ------------------------------------------------------- - Parallel Hash Left Join + QUERY PLAN +------------------------------------------------------------ + Parallel Hash Right Join Locus: Entry - Hash Cond: ((count(*)) = rt1.a) - -> Finalize Aggregate + Hash Cond: (rt1.a = (count(*))) + -> Gather Motion 2:1 (slice1; segments: 2) Locus: Entry - -> Gather Motion 6:1 (slice1; segments: 6) - Locus: Entry - -> Partial Aggregate - Locus: HashedWorkers - Parallel Workers: 2 - -> Parallel Seq Scan on t1 - Locus: HashedWorkers - Parallel Workers: 2 + -> Parallel Seq Scan on rt1 + Locus: SegmentGeneralWorkers + Parallel Workers: 2 -> Parallel Hash Locus: Entry - -> Gather Motion 2:1 (slice2; segments: 2) + -> Finalize Aggregate Locus: Entry - -> Parallel Seq Scan on rt1 - Locus: SegmentGeneralWorkers - Parallel Workers: 2 + -> Gather Motion 6:1 (slice2; segments: 6) + Locus: Entry + -> Partial Aggregate + Locus: HashedWorkers + Parallel Workers: 2 + -> Parallel Seq Scan on t1 + Locus: HashedWorkers + Parallel Workers: 2 Optimizer: Postgres query optimizer (21 rows) @@ -1661,17 +1661,17 @@ explain(costs off) select * from t1 right join t2 on t1.b = t2.a; QUERY PLAN ------------------------------------------------------------------ Gather Motion 9:1 (slice1; segments: 9) - -> Parallel Hash Left Join - Hash Cond: (t2.a = t1.b) - -> Redistribute Motion 6:9 (slice2; segments: 6) - Hash Key: t2.a + -> Parallel Hash Right Join + Hash Cond: (t1.b = t2.a) + -> Redistribute Motion 9:9 (slice2; segments: 9) + Hash Key: t1.b Hash Module: 3 - -> Parallel Seq Scan on t2 + -> Parallel Seq Scan on t1 -> Parallel Hash - -> Redistribute Motion 9:9 (slice3; segments: 9) - Hash Key: t1.b + -> Redistribute Motion 6:9 (slice3; segments: 6) + Hash Key: t2.a Hash Module: 3 - -> Parallel Seq Scan on t1 + -> Parallel Seq Scan on t2 Optimizer: Postgres query optimizer (13 rows) @@ -1690,34 +1690,34 @@ analyze t2; set local enable_parallel = on; -- parallel hash join with shared table, SinglQE as outer partial path. explain(locus, costs off) select * from (select count(*) as a from t2) t2 left join t1 on t1.a = t2.a; - QUERY PLAN ------------------------------------------------------------------- + QUERY PLAN +------------------------------------------------------------------------ Gather Motion 6:1 (slice1; segments: 6) Locus: Entry - -> Parallel Hash Left Join - Locus: Hashed + -> Parallel Hash Right Join + Locus: HashedWorkers Parallel Workers: 2 - Hash Cond: ((count(*)) = t1.a) - -> Redistribute Motion 1:6 (slice2; segments: 1) - Locus: Hashed + Hash Cond: (t1.a = (count(*))) + -> Parallel Seq Scan on t1 + Locus: HashedWorkers Parallel Workers: 2 - Hash Key: (count(*)) - Hash Module: 3 - -> Finalize Aggregate - Locus: SingleQE - -> Gather Motion 6:1 (slice3; segments: 6) - Locus: SingleQE - -> Partial Aggregate - Locus: HashedWorkers - Parallel Workers: 2 - -> Parallel Seq Scan on t2 - Locus: HashedWorkers - Parallel Workers: 2 -> Parallel Hash Locus: Hashed - -> Parallel Seq Scan on t1 - Locus: HashedWorkers + -> Redistribute Motion 1:6 (slice2; segments: 1) + Locus: Hashed Parallel Workers: 2 + Hash Key: (count(*)) + Hash Module: 3 + -> Finalize Aggregate + Locus: SingleQE + -> Gather Motion 6:1 (slice3; segments: 6) + Locus: SingleQE + -> Partial Aggregate + Locus: HashedWorkers + Parallel Workers: 2 + -> Parallel Seq Scan on t2 + Locus: HashedWorkers + Parallel Workers: 2 Optimizer: Postgres query optimizer (27 rows) @@ -1975,7 +1975,7 @@ explain (locus, costs off) select * from rt1 union all select * from t1; -> Result Locus: Strewn Parallel Workers: 3 - One-Time Filter: (gp_execution_segment() = 0) + One-Time Filter: (gp_execution_segment() = 1) -> Parallel Seq Scan on rt1 Locus: SegmentGeneralWorkers Parallel Workers: 3 @@ -1999,7 +1999,7 @@ explain (locus, costs off) select * from rt1 union all select * from t2; -> Result Locus: Strewn Parallel Workers: 3 - One-Time Filter: (gp_execution_segment() = 0) + One-Time Filter: (gp_execution_segment() = 1) -> Parallel Seq Scan on rt1 Locus: SegmentGeneralWorkers Parallel Workers: 3 @@ -2296,8 +2296,8 @@ analyze t1; analyze t2; analyze t3_null; explain(costs off) select sum(t1.c1) from t1 where c1 not in (select c2 from t2); - QUERY PLAN ------------------------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------------------- Finalize Aggregate -> Gather Motion 6:1 (slice1; segments: 6) -> Partial Aggregate @@ -2317,8 +2317,8 @@ select sum(t1.c1) from t1 where c1 not in (select c2 from t2); (1 row) explain(costs off) select * from t1 where c1 not in (select c2 from t3_null); - QUERY PLAN ------------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------- Gather Motion 6:1 (slice1; segments: 6) -> Parallel Hash Left Anti Semi (Not-In) Join Hash Cond: (t1.c1 = t3_null.c2) @@ -2457,8 +2457,11 @@ abort; begin; create table pagg_tab (a int, b int, c text, d int) partition by list(c); create table pagg_tab_p1 partition of pagg_tab for values in ('0000', '0001', '0002', '0003', '0004'); +NOTICE: table has parent, setting distribution columns to match parent table create table pagg_tab_p2 partition of pagg_tab for values in ('0005', '0006', '0007', '0008'); +NOTICE: table has parent, setting distribution columns to match parent table create table pagg_tab_p3 partition of pagg_tab for values in ('0009', '0010', '0011'); +NOTICE: table has parent, setting distribution columns to match parent table insert into pagg_tab select i % 20, i % 30, to_char(i % 12, 'FM0000'), i % 30 from generate_series(0, 2999) i; analyze pagg_tab; set local enable_parallel to off; @@ -2972,7 +2975,7 @@ create table t2_anti(a int, b int) with(parallel_workers=2) distributed by (b); insert into t2_anti values(generate_series(5, 10)); explain(costs off, verbose) select t1_anti.a, t1_anti.b from t1_anti left join t2_anti on t1_anti.a = t2_anti.a where t2_anti.a is null; - QUERY PLAN + QUERY PLAN ------------------------------------------------------------------ Gather Motion 3:1 (slice1; segments: 3) Output: t1_anti.a, t1_anti.b @@ -3068,8 +3071,8 @@ select t1_anti.a, t1_anti.b from t1_anti left join t2_anti on t1_anti.a = t2_ant ---+--- 3 | 4 | - 1 | 2 | + 1 | (4 rows) abort; @@ -3098,7 +3101,7 @@ insert into t_distinct_0 select * from t_distinct_0; analyze t_distinct_0; explain(costs off) select distinct a from t_distinct_0; - QUERY PLAN + QUERY PLAN ------------------------------------------------------------ Gather Motion 3:1 (slice1; segments: 3) -> HashAggregate @@ -3232,8 +3235,6 @@ select distinct a, b from t_distinct_0; drop table if exists t_distinct_1; NOTICE: table "t_distinct_1" does not exist, skipping create table t_distinct_1(a int, b int) using ao_column; -NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. -HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. insert into t_distinct_1 select * from t_distinct_0; analyze t_distinct_1; set enable_parallel = off; @@ -3520,10 +3521,7 @@ WHERE e.salary > ( -- Test https://github.com/apache/cloudberry/issues/1376 -- create table t1(a int, b int); -NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. -HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table t2 (like t1); -NOTICE: table doesn't have 'DISTRIBUTED BY' clause, defaulting to distribution columns from LIKE table set gp_cte_sharing = on; explain(locus, costs off) with x as (select a, count(*) as b from t1 group by a union all @@ -3571,8 +3569,184 @@ explain(locus, costs off) with x as reset gp_cte_sharing; reset enable_parallel; reset min_parallel_table_scan_size; +-- +-- Parallel Hash Full/Right Join +-- +begin; +create table pj_t1(id int, v int) with(parallel_workers=2) distributed by (id); +create table pj_t2(id int, v int) with(parallel_workers=2) distributed by (id); +create table pj_t3(id int, v int) with(parallel_workers=0) distributed by (id); +-- pj_t1 is 3x larger than pj_t2 so the planner hashes the smaller pj_t2 +-- and probes with pj_t1, producing a genuine Parallel Hash Right Join plan. +insert into pj_t1 select i, i from generate_series(1,30000)i; +insert into pj_t2 select i, i from generate_series(25001,35000)i; +insert into pj_t3 select i, i from generate_series(1,10000)i; +analyze pj_t1; +analyze pj_t2; +analyze pj_t3; +set local enable_parallel = on; +set local min_parallel_table_scan_size = 0; +-- 12_P_12_10: Parallel Hash Full Join: HashedWorkers FULL JOIN HashedWorkers -> HashedOJ(parallel) +explain(costs off, locus) +select count(*) from pj_t1 full join pj_t2 using (id); + QUERY PLAN +---------------------------------------------------------- + Finalize Aggregate + Locus: Entry + -> Gather Motion 6:1 (slice1; segments: 6) + Locus: Entry + -> Partial Aggregate + Locus: HashedOJ + Parallel Workers: 2 + -> Parallel Hash Full Join + Locus: HashedOJ + Parallel Workers: 2 + Hash Cond: (pj_t1.id = pj_t2.id) + -> Parallel Seq Scan on pj_t1 + Locus: HashedWorkers + Parallel Workers: 2 + -> Parallel Hash + Locus: Hashed + -> Parallel Seq Scan on pj_t2 + Locus: HashedWorkers + Parallel Workers: 2 + Optimizer: Postgres query optimizer +(20 rows) + +-- correctness: parallel result matches non-parallel +set local enable_parallel = off; +select count(*) from pj_t1 full join pj_t2 using (id); + count +------- + 35000 +(1 row) + +set local enable_parallel = on; +select count(*) from pj_t1 full join pj_t2 using (id); + count +------- + 35000 +(1 row) + +-- Parallel Hash Right Join: pj_t1 (30K) is larger, so the planner hashes the smaller pj_t2 +-- (10K) as the build side and probes with pj_t1; result locus HashedWorkers(parallel) +explain(costs off, locus) +select count(*) from pj_t1 right join pj_t2 using (id); + QUERY PLAN +---------------------------------------------------------- + Finalize Aggregate + Locus: Entry + -> Gather Motion 6:1 (slice1; segments: 6) + Locus: Entry + -> Partial Aggregate + Locus: HashedWorkers + Parallel Workers: 2 + -> Parallel Hash Right Join + Locus: HashedWorkers + Parallel Workers: 2 + Hash Cond: (pj_t1.id = pj_t2.id) + -> Parallel Seq Scan on pj_t1 + Locus: HashedWorkers + Parallel Workers: 2 + -> Parallel Hash + Locus: Hashed + -> Parallel Seq Scan on pj_t2 + Locus: HashedWorkers + Parallel Workers: 2 + Optimizer: Postgres query optimizer +(20 rows) + +-- correctness: parallel result matches non-parallel +set local enable_parallel = off; +select count(*) from pj_t1 right join pj_t2 using (id); + count +------- + 10000 +(1 row) + +set local enable_parallel = on; +select count(*) from pj_t1 right join pj_t2 using (id); + count +------- + 10000 +(1 row) + +-- Locus propagation: HashedOJ(parallel) followed by INNER JOIN with Hashed(serial) +-- The full join result (HashedOJ,parallel=2) is joined with pj_t3 (Hashed,serial) +explain(costs off, locus) +select count(*) from (pj_t1 full join pj_t2 using (id)) fj inner join pj_t3 using (id); + QUERY PLAN +--------------------------------------------------------------------------- + Finalize Aggregate + Locus: Entry + -> Gather Motion 3:1 (slice1; segments: 3) + Locus: Entry + -> Partial Aggregate + Locus: HashedOJ + -> Hash Join + Locus: HashedOJ + Hash Cond: (COALESCE(pj_t1.id, pj_t2.id) = pj_t3.id) + -> Hash Full Join + Locus: HashedOJ + Hash Cond: (pj_t1.id = pj_t2.id) + -> Seq Scan on pj_t1 + Locus: Hashed + -> Hash + Locus: Hashed + -> Seq Scan on pj_t2 + Locus: Hashed + -> Hash + Locus: Replicated + -> Broadcast Motion 3:3 (slice2; segments: 3) + Locus: Replicated + -> Seq Scan on pj_t3 + Locus: Hashed + Optimizer: Postgres query optimizer +(25 rows) + +-- Locus propagation: HashedOJ(parallel) followed by FULL JOIN with Hashed(serial) +explain(costs off, locus) +select count(*) from (pj_t1 full join pj_t2 using (id)) fj full join pj_t3 using (id); + QUERY PLAN +-------------------------------------------------------------------------- + Finalize Aggregate + Locus: Entry + -> Gather Motion 3:1 (slice1; segments: 3) + Locus: Entry + -> Partial Aggregate + Locus: HashedOJ + -> Hash Full Join + Locus: HashedOJ + Hash Cond: (COALESCE(pj_t1.id, pj_t2.id) = pj_t3.id) + -> Redistribute Motion 3:3 (slice2; segments: 3) + Locus: Hashed + Hash Key: COALESCE(pj_t1.id, pj_t2.id) + -> Hash Full Join + Locus: HashedOJ + Hash Cond: (pj_t1.id = pj_t2.id) + -> Seq Scan on pj_t1 + Locus: Hashed + -> Hash + Locus: Hashed + -> Seq Scan on pj_t2 + Locus: Hashed + -> Hash + Locus: Hashed + -> Seq Scan on pj_t3 + Locus: Hashed + Optimizer: Postgres query optimizer +(26 rows) + +abort; -- start_ignore drop schema test_parallel cascade; +NOTICE: drop cascades to 6 other objects +DETAIL: drop cascades to table t_distinct_0 +drop cascades to table t_distinct_1 +drop cascades to table departments +drop cascades to table employees +drop cascades to table t1 +drop cascades to table t2 -- end_ignore reset gp_appendonly_insert_files; reset force_parallel_mode; diff --git a/src/test/regress/expected/join_hash.out b/src/test/regress/expected/join_hash.out index 28f4558ec04..e5f74c18d28 100644 --- a/src/test/regress/expected/join_hash.out +++ b/src/test/regress/expected/join_hash.out @@ -10,6 +10,9 @@ set allow_system_table_mods=on; set local min_parallel_table_scan_size = 0; set local parallel_setup_cost = 0; set local enable_hashjoin = on; +-- CBDB: disable CBDB parallel for these PG-originated tests; parallel full join +-- is tested separately in cbdb_parallel.sql. +set local enable_parallel = off; -- Extract bucket and batch counts from an explain analyze plan. In -- general we can't make assertions about how many batches (or -- buckets) will be required because it can vary, but we can in some @@ -58,12 +61,16 @@ $$; -- estimated size. create table simple as select generate_series(1, 60000) AS id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'; +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named 'id' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. alter table simple set (parallel_workers = 2); analyze simple; -- Make a relation whose size we will under-estimate. We want stats -- to say 1000 rows, but actually there are 20,000 rows. create table bigger_than_it_looks as select generate_series(1, 60000) as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'; +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named 'id' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. alter table bigger_than_it_looks set (autovacuum_enabled = 'false'); WARNING: autovacuum is not supported in Cloudberry alter table bigger_than_it_looks set (parallel_workers = 2); @@ -73,6 +80,8 @@ update pg_class set reltuples = 1000 where relname = 'bigger_than_it_looks'; -- kind of skew that breaks our batching scheme. We want stats to say -- 2 rows, but actually there are 20,000 rows with the same key. create table extremely_skewed (id int, t text); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'id' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. alter table extremely_skewed set (autovacuum_enabled = 'false'); WARNING: autovacuum is not supported in Cloudberry alter table extremely_skewed set (parallel_workers = 2); @@ -85,6 +94,8 @@ update pg_class where relname = 'extremely_skewed'; -- Make a relation with a couple of enormous tuples. create table wide as select generate_series(1, 2) as id, rpad('', 320000, 'x') as t; +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named 'id' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. alter table wide set (parallel_workers = 2); ANALYZE wide; -- The "optimal" case: the hash table fits in memory; we plan for 1 @@ -319,7 +330,7 @@ $$); select count(*) from simple r full outer join simple s using (id); count ------- - 20000 + 60000 (1 row) rollback to settings; @@ -574,9 +585,13 @@ rollback to settings; -- Exercise rescans. We'll turn off parallel_leader_participation so -- that we can check that instrumentation comes back correctly. create table join_foo as select generate_series(1, 3) as id, 'xxxxx'::text as t; +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named 'id' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. analyze join_foo; alter table join_foo set (parallel_workers = 0); create table join_bar as select generate_series(1, 20000) as id, 'xxxxx'::text as t; +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named 'id' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. analyze join_bar; alter table join_bar set (parallel_workers = 2); -- multi-batch with rescan, parallel-oblivious @@ -854,23 +869,23 @@ savepoint settings; set local max_parallel_workers_per_gather = 2; explain (costs off) select count(*) from simple r full outer join simple s using (id); - QUERY PLAN -------------------------------------------------------------- + QUERY PLAN +---------------------------------------------------- Finalize Aggregate - -> Gather - Workers Planned: 2 + -> Gather Motion 3:1 (slice1; segments: 3) -> Partial Aggregate - -> Parallel Hash Full Join + -> Hash Full Join Hash Cond: (r.id = s.id) - -> Parallel Seq Scan on simple r - -> Parallel Hash - -> Parallel Seq Scan on simple s + -> Seq Scan on simple r + -> Hash + -> Seq Scan on simple s + Optimizer: Postgres query optimizer (9 rows) select count(*) from simple r full outer join simple s using (id); count ------- - 20000 + 60000 (1 row) rollback to settings; @@ -935,23 +950,25 @@ savepoint settings; set local max_parallel_workers_per_gather = 2; explain (costs off) select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); - QUERY PLAN -------------------------------------------------------------- + QUERY PLAN +------------------------------------------------------------------------ Finalize Aggregate - -> Gather - Workers Planned: 2 + -> Gather Motion 3:1 (slice1; segments: 3) -> Partial Aggregate - -> Parallel Hash Full Join + -> Hash Full Join Hash Cond: ((0 - s.id) = r.id) - -> Parallel Seq Scan on simple s - -> Parallel Hash - -> Parallel Seq Scan on simple r -(9 rows) + -> Redistribute Motion 3:3 (slice2; segments: 3) + Hash Key: (0 - s.id) + -> Seq Scan on simple s + -> Hash + -> Seq Scan on simple r + Optimizer: Postgres query optimizer +(11 rows) select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); - count -------- - 40000 + count +-------- + 120000 (1 row) rollback to settings; @@ -1013,7 +1030,11 @@ rollback to settings; savepoint settings; set max_parallel_workers_per_gather = 0; create table join_hash_t_small(a int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table join_hash_t_big(b int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'b' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. insert into join_hash_t_small select i%100 from generate_series(0, 3000)i; insert into join_hash_t_big select i%100000 from generate_series(1, 100000)i ; analyze join_hash_t_small; @@ -1031,17 +1052,25 @@ explain (costs off) select * from join_hash_t_small, join_hash_t_big where a = b (7 rows) rollback to settings; +rollback; -- Hash join reuses the HOT status bit to indicate match status. This can only -- be guaranteed to produce correct results if all the hash join tuple match -- bits are reset before reuse. This is done upon loading them into the -- hashtable. +begin; SAVEPOINT settings; +-- CBDB: disable CBDB parallel; the serial full join match-bit test is what matters here. +SET enable_parallel = off; SET enable_parallel_hash = on; SET min_parallel_table_scan_size = 0; SET parallel_setup_cost = 0; SET parallel_tuple_cost = 0; CREATE TABLE hjtest_matchbits_t1(id int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'id' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. CREATE TABLE hjtest_matchbits_t2(id int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'id' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. INSERT INTO hjtest_matchbits_t1 VALUES (1); INSERT INTO hjtest_matchbits_t2 VALUES (2); -- Update should create a HOT tuple. If this status bit isn't cleared, we won't @@ -1064,8 +1093,8 @@ SET enable_parallel_hash = off; SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id; id | id ----+---- - 1 | | 2 + 1 | (2 rows) ROLLBACK TO settings; @@ -1085,7 +1114,11 @@ BEGIN; SET LOCAL enable_sort = OFF; -- avoid mergejoins SET LOCAL from_collapse_limit = 1; -- allows easy changing of join order CREATE TABLE hjtest_1 (a text, b int, id int, c bool); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. CREATE TABLE hjtest_2 (a bool, id int, b text, c int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 2, 1, false); -- matches INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 1, 2, false); -- fails id join condition INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 20, 1, false); -- fails < 50 @@ -1142,8 +1175,8 @@ WHERE SubPlan 2 -> Result Output: (hjtest_1.b * 5) + Settings: enable_parallel = 'on', enable_sort = 'off', from_collapse_limit = '1', optimizer = 'off' Optimizer: Postgres query optimizer - Settings: enable_sort=off, from_collapse_limit=1 (38 rows) SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2 @@ -1206,8 +1239,8 @@ WHERE SubPlan 3 -> Result Output: (hjtest_2.c * 5) + Settings: enable_parallel = 'on', enable_sort = 'off', from_collapse_limit = '1', optimizer = 'off' Optimizer: Postgres query optimizer - Settings: enable_sort=off, from_collapse_limit=1 (38 rows) SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2 diff --git a/src/test/regress/expected/join_hash_optimizer.out b/src/test/regress/expected/join_hash_optimizer.out index 053d0ef4898..1835bfa4f31 100644 --- a/src/test/regress/expected/join_hash_optimizer.out +++ b/src/test/regress/expected/join_hash_optimizer.out @@ -10,6 +10,9 @@ set allow_system_table_mods=on; set local min_parallel_table_scan_size = 0; set local parallel_setup_cost = 0; set local enable_hashjoin = on; +-- CBDB: disable CBDB parallel for these PG-originated tests; parallel full join +-- is tested separately in cbdb_parallel.sql. +set local enable_parallel = off; -- Extract bucket and batch counts from an explain analyze plan. In -- general we can't make assertions about how many batches (or -- buckets) will be required because it can vary, but we can in some @@ -115,7 +118,7 @@ explain (costs off) -> Redistribute Motion 3:3 (slice3; segments: 3) Hash Key: s.id -> Seq Scan on simple s - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (13 rows) select count(*) from simple r join simple s using (id); @@ -156,7 +159,7 @@ explain (costs off) -> Redistribute Motion 3:3 (slice3; segments: 3) Hash Key: s.id -> Seq Scan on simple s - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (13 rows) select count(*) from simple r join simple s using (id); @@ -197,7 +200,7 @@ explain (costs off) -> Redistribute Motion 3:3 (slice3; segments: 3) Hash Key: s.id -> Seq Scan on simple s - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (13 rows) select count(*) from simple r join simple s using (id); @@ -241,7 +244,7 @@ explain (costs off) -> Redistribute Motion 3:3 (slice3; segments: 3) Hash Key: s.id -> Seq Scan on simple s - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (13 rows) select count(*) from simple r join simple s using (id); @@ -283,7 +286,7 @@ explain (costs off) -> Redistribute Motion 3:3 (slice3; segments: 3) Hash Key: s.id -> Seq Scan on simple s - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (13 rows) select count(*) from simple r join simple s using (id); @@ -325,7 +328,7 @@ explain (costs off) -> Redistribute Motion 3:3 (slice3; segments: 3) Hash Key: s.id -> Seq Scan on simple s - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (13 rows) select count(*) from simple r join simple s using (id); @@ -344,6 +347,13 @@ $$); t | f (1 row) +-- parallel full multi-batch hash join +select count(*) from simple r full outer join simple s using (id); + count +------- + 60000 +(1 row) + rollback to settings; -- The "bad" case: during execution we need to increase number of -- batches; in this case we plan for 1 batch, and increase at least a @@ -356,8 +366,8 @@ set local work_mem = '128kB'; set local statement_mem = '1000kB'; -- GPDB uses statement_mem instead of work_mem explain (costs off) select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id); - QUERY PLAN ------------------------------------------------------------------------------- + QUERY PLAN +--------------------------------------------------------------------------- Finalize Aggregate -> Gather Motion 3:1 (slice1; segments: 3) -> Partial Aggregate @@ -367,8 +377,8 @@ explain (costs off) -> Hash -> Broadcast Motion 3:3 (slice2; segments: 3) -> Seq Scan on bigger_than_it_looks s - Optimizer: Pivotal Optimizer (GPORCA) -(13 rows) + Optimizer: GPORCA +(10 rows) select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id); count @@ -395,8 +405,8 @@ set local statement_mem = '1000kB'; -- GPDB uses statement_mem instead of work_m set local enable_parallel_hash = off; explain (costs off) select count(*) from simple r join bigger_than_it_looks s using (id); - QUERY PLAN ------------------------------------------------------------------------------- + QUERY PLAN +--------------------------------------------------------------------------- Finalize Aggregate -> Gather Motion 3:1 (slice1; segments: 3) -> Partial Aggregate @@ -406,8 +416,8 @@ explain (costs off) -> Hash -> Broadcast Motion 3:3 (slice2; segments: 3) -> Seq Scan on bigger_than_it_looks s - Optimizer: Pivotal Optimizer (GPORCA) -(13 rows) + Optimizer: GPORCA +(10 rows) select count(*) from simple r join bigger_than_it_looks s using (id); count @@ -434,8 +444,8 @@ set local statement_mem = '1000kB'; -- GPDB uses statement_mem instead of work_m set local enable_parallel_hash = on; explain (costs off) select count(*) from simple r join bigger_than_it_looks s using (id); - QUERY PLAN ------------------------------------------------------------------------------- + QUERY PLAN +--------------------------------------------------------------------------- Finalize Aggregate -> Gather Motion 3:1 (slice1; segments: 3) -> Partial Aggregate @@ -445,8 +455,8 @@ explain (costs off) -> Hash -> Broadcast Motion 3:3 (slice2; segments: 3) -> Seq Scan on bigger_than_it_looks s - Optimizer: Pivotal Optimizer (GPORCA) -(13 rows) + Optimizer: GPORCA +(10 rows) select count(*) from simple r join bigger_than_it_looks s using (id); count @@ -490,7 +500,7 @@ HINT: For non-partitioned tables, run analyze <table_name>(<column_list>). For -> Hash -> Broadcast Motion 3:3 (slice2; segments: 3) -> Seq Scan on extremely_skewed s - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (10 rows) select count(*) from simple r join extremely_skewed s using (id); @@ -534,7 +544,7 @@ HINT: For non-partitioned tables, run analyze <table_name>(<column_list>). For -> Hash -> Broadcast Motion 3:3 (slice2; segments: 3) -> Seq Scan on extremely_skewed s - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (10 rows) select count(*) from simple r join extremely_skewed s using (id); @@ -578,7 +588,7 @@ HINT: For non-partitioned tables, run analyze <table_name>(<column_list>). For -> Hash -> Broadcast Motion 3:3 (slice2; segments: 3) -> Seq Scan on extremely_skewed s - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (10 rows) select count(*) from simple r join extremely_skewed s using (id); @@ -643,8 +653,8 @@ explain (costs off) select count(*) from join_foo left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1; - QUERY PLAN ------------------------------------------------------------------------------------------------------------- + QUERY PLAN +------------------------------------------------------------------------------------------------ Finalize Aggregate -> Gather Motion 3:1 (slice1; segments: 3) -> Partial Aggregate @@ -662,7 +672,7 @@ explain (costs off) -> Redistribute Motion 3:3 (slice4; segments: 3) Hash Key: b2.id -> Seq Scan on join_bar b2 - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (18 rows) select count(*) from join_foo @@ -701,8 +711,8 @@ explain (costs off) select count(*) from join_foo left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1; - QUERY PLAN ------------------------------------------------------------------------------------------------------------- + QUERY PLAN +------------------------------------------------------------------------------------------------ Finalize Aggregate -> Gather Motion 3:1 (slice1; segments: 3) -> Partial Aggregate @@ -720,7 +730,7 @@ explain (costs off) -> Redistribute Motion 3:3 (slice4; segments: 3) Hash Key: b2.id -> Seq Scan on join_bar b2 - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (18 rows) select count(*) from join_foo @@ -760,8 +770,8 @@ explain (costs off) select count(*) from join_foo left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1; - QUERY PLAN ------------------------------------------------------------------------------------------------------------- + QUERY PLAN +------------------------------------------------------------------------------------------------ Finalize Aggregate -> Gather Motion 3:1 (slice1; segments: 3) -> Partial Aggregate @@ -779,7 +789,7 @@ explain (costs off) -> Redistribute Motion 3:3 (slice4; segments: 3) Hash Key: b2.id -> Seq Scan on join_bar b2 - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (18 rows) select count(*) from join_foo @@ -818,8 +828,8 @@ explain (costs off) select count(*) from join_foo left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1; - QUERY PLAN ------------------------------------------------------------------------------------------------------------- + QUERY PLAN +------------------------------------------------------------------------------------------------ Finalize Aggregate -> Gather Motion 3:1 (slice1; segments: 3) -> Partial Aggregate @@ -837,7 +847,7 @@ explain (costs off) -> Redistribute Motion 3:3 (slice4; segments: 3) Hash Key: b2.id -> Seq Scan on join_bar b2 - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (18 rows) select count(*) from join_foo @@ -891,8 +901,9 @@ select count(*) from simple r full outer join simple s using (id); (1 row) rollback to settings; --- parallelism not possible with parallel-oblivious outer hash join +-- parallelism not possible with parallel-oblivious full hash join savepoint settings; +set enable_parallel_hash = off; set local max_parallel_workers_per_gather = 2; explain (costs off) select count(*) from simple r full outer join simple s using (id); @@ -920,7 +931,36 @@ select count(*) from simple r full outer join simple s using (id); (1 row) rollback to settings; --- An full outer join where every record is not matched. +-- parallelism is possible with parallel-aware full hash join +savepoint settings; +set local max_parallel_workers_per_gather = 2; +explain (costs off) + select count(*) from simple r full outer join simple s using (id); + QUERY PLAN +------------------------------------------------------------------------------ + Finalize Aggregate + -> Gather Motion 3:1 (slice1; segments: 3) + -> Partial Aggregate + -> Hash Full Join + Hash Cond: (r.id = s.id) + -> Redistribute Motion 3:3 (slice2; segments: 3) + Hash Key: r.id + -> Seq Scan on simple r + -> Hash + -> Redistribute Motion 3:3 (slice3; segments: 3) + Hash Key: s.id + -> Seq Scan on simple s + Optimizer: GPORCA +(13 rows) + +select count(*) from simple r full outer join simple s using (id); + count +------- + 60000 +(1 row) + +rollback to settings; +-- A full outer join where every record is not matched. -- non-parallel savepoint settings; set local max_parallel_workers_per_gather = 0; @@ -950,7 +990,37 @@ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); (1 row) rollback to settings; --- parallelism not possible with parallel-oblivious outer hash join +-- parallelism not possible with parallel-oblivious full hash join +savepoint settings; +set enable_parallel_hash = off; +set local max_parallel_workers_per_gather = 2; +explain (costs off) + select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); + QUERY PLAN +------------------------------------------------------------------------------ + Finalize Aggregate + -> Gather Motion 3:1 (slice1; segments: 3) + -> Partial Aggregate + -> Hash Full Join + Hash Cond: (r.id = (0 - s.id)) + -> Redistribute Motion 3:3 (slice2; segments: 3) + Hash Key: r.id + -> Seq Scan on simple r + -> Hash + -> Redistribute Motion 3:3 (slice3; segments: 3) + Hash Key: (0 - s.id) + -> Seq Scan on simple s + Optimizer: GPORCA +(13 rows) + +select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); + count +-------- + 120000 +(1 row) + +rollback to settings; +-- parallelism is possible with parallel-aware full hash join savepoint settings; set local max_parallel_workers_per_gather = 2; explain (costs off) @@ -1012,7 +1082,7 @@ explain (costs off) -> Redistribute Motion 3:3 (slice3; segments: 3) Hash Key: wide_1.id -> Seq Scan on wide wide_1 - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (13 rows) select length(max(s.t)) @@ -1060,11 +1130,57 @@ explain (costs off) select * from join_hash_t_small, join_hash_t_big where a = b -> Seq Scan on join_hash_t_big -> Hash -> Seq Scan on join_hash_t_small - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (7 rows) rollback to settings; rollback; +-- Hash join reuses the HOT status bit to indicate match status. This can only +-- be guaranteed to produce correct results if all the hash join tuple match +-- bits are reset before reuse. This is done upon loading them into the +-- hashtable. +begin; +SAVEPOINT settings; +-- CBDB: disable CBDB parallel; the serial full join match-bit test is what matters here. +SET enable_parallel = off; +SET enable_parallel_hash = on; +SET min_parallel_table_scan_size = 0; +SET parallel_setup_cost = 0; +SET parallel_tuple_cost = 0; +CREATE TABLE hjtest_matchbits_t1(id int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'id' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. +CREATE TABLE hjtest_matchbits_t2(id int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'id' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. +INSERT INTO hjtest_matchbits_t1 VALUES (1); +INSERT INTO hjtest_matchbits_t2 VALUES (2); +-- Update should create a HOT tuple. If this status bit isn't cleared, we won't +-- correctly emit the NULL-extended unmatching tuple in full hash join. +UPDATE hjtest_matchbits_t2 set id = 2; +SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id + ORDER BY t1.id; + id | id +----+---- + 1 | + | 2 +(2 rows) + +-- Test serial full hash join. +-- Resetting parallel_setup_cost should force a serial plan. +-- Just to be safe, however, set enable_parallel_hash to off, as parallel full +-- hash joins are only supported with shared hashtables. +RESET parallel_setup_cost; +SET enable_parallel_hash = off; +SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id; + id | id +----+---- + | 2 + 1 | +(2 rows) + +ROLLBACK TO settings; +rollback; -- Verify that hash key expressions reference the correct -- nodes. Hashjoin's hashkeys need to reference its outer plan, Hash's -- need to reference Hash's outer plan (which is below HashJoin's @@ -1154,9 +1270,9 @@ WHERE Filter: (((hjtest_1.b * 5)) < 50) -> Result Output: (hjtest_1.b * 5) - Settings: enable_sort = 'off', from_collapse_limit = '1' - Optimizer: Pivotal Optimizer (GPORCA) -(49 rows) + Settings: enable_parallel = 'on', enable_sort = 'off', from_collapse_limit = '1', optimizer = 'on' + Optimizer: GPORCA +(51 rows) SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2 FROM hjtest_1, hjtest_2 @@ -1231,9 +1347,9 @@ WHERE Filter: (((hjtest_1.b * 5)) < 50) -> Result Output: (hjtest_1.b * 5) - Settings: enable_sort = 'off', from_collapse_limit = '1' - Optimizer: Pivotal Optimizer (GPORCA) -(49 rows) + Settings: enable_parallel = 'on', enable_sort = 'off', from_collapse_limit = '1', optimizer = 'on' + Optimizer: GPORCA +(51 rows) SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2 FROM hjtest_2, hjtest_1 diff --git a/src/test/regress/sql/cbdb_parallel.sql b/src/test/regress/sql/cbdb_parallel.sql index f9d01dd8a00..08e7aa198f9 100644 --- a/src/test/regress/sql/cbdb_parallel.sql +++ b/src/test/regress/sql/cbdb_parallel.sql @@ -1149,6 +1149,56 @@ reset gp_cte_sharing; reset enable_parallel; reset min_parallel_table_scan_size; +-- +-- Parallel Hash Full/Right Join +-- +begin; +create table pj_t1(id int, v int) with(parallel_workers=2) distributed by (id); +create table pj_t2(id int, v int) with(parallel_workers=2) distributed by (id); +create table pj_t3(id int, v int) with(parallel_workers=0) distributed by (id); + +-- pj_t1 is 3x larger than pj_t2 so the planner hashes the smaller pj_t2 +-- and probes with pj_t1, producing a genuine Parallel Hash Right Join plan. +insert into pj_t1 select i, i from generate_series(1,30000)i; +insert into pj_t2 select i, i from generate_series(25001,35000)i; +insert into pj_t3 select i, i from generate_series(1,10000)i; +analyze pj_t1; +analyze pj_t2; +analyze pj_t3; + +set local enable_parallel = on; +set local min_parallel_table_scan_size = 0; + +-- 12_P_12_10: Parallel Hash Full Join: HashedWorkers FULL JOIN HashedWorkers -> HashedOJ(parallel) +explain(costs off, locus) +select count(*) from pj_t1 full join pj_t2 using (id); +-- correctness: parallel result matches non-parallel +set local enable_parallel = off; +select count(*) from pj_t1 full join pj_t2 using (id); +set local enable_parallel = on; +select count(*) from pj_t1 full join pj_t2 using (id); + +-- Parallel Hash Right Join: pj_t1 (30K) is larger, so the planner hashes the smaller pj_t2 +-- (10K) as the build side and probes with pj_t1; result locus HashedWorkers(parallel) +explain(costs off, locus) +select count(*) from pj_t1 right join pj_t2 using (id); +-- correctness: parallel result matches non-parallel +set local enable_parallel = off; +select count(*) from pj_t1 right join pj_t2 using (id); +set local enable_parallel = on; +select count(*) from pj_t1 right join pj_t2 using (id); + +-- Locus propagation: HashedOJ(parallel) followed by INNER JOIN with Hashed(serial) +-- The full join result (HashedOJ,parallel=2) is joined with pj_t3 (Hashed,serial) +explain(costs off, locus) +select count(*) from (pj_t1 full join pj_t2 using (id)) fj inner join pj_t3 using (id); + +-- Locus propagation: HashedOJ(parallel) followed by FULL JOIN with Hashed(serial) +explain(costs off, locus) +select count(*) from (pj_t1 full join pj_t2 using (id)) fj full join pj_t3 using (id); + +abort; + -- start_ignore drop schema test_parallel cascade; -- end_ignore diff --git a/src/test/regress/sql/join_hash.sql b/src/test/regress/sql/join_hash.sql index 0115489a6b9..2978e155ecd 100644 --- a/src/test/regress/sql/join_hash.sql +++ b/src/test/regress/sql/join_hash.sql @@ -13,6 +13,9 @@ set allow_system_table_mods=on; set local min_parallel_table_scan_size = 0; set local parallel_setup_cost = 0; set local enable_hashjoin = on; +-- CBDB: disable CBDB parallel for these PG-originated tests; parallel full join +-- is tested separately in cbdb_parallel.sql. +set local enable_parallel = off; -- Extract bucket and batch counts from an explain analyze plan. In -- general we can't make assertions about how many batches (or @@ -543,7 +546,10 @@ rollback; -- be guaranteed to produce correct results if all the hash join tuple match -- bits are reset before reuse. This is done upon loading them into the -- hashtable. +begin; SAVEPOINT settings; +-- CBDB: disable CBDB parallel; the serial full join match-bit test is what matters here. +SET enable_parallel = off; SET enable_parallel_hash = on; SET min_parallel_table_scan_size = 0; SET parallel_setup_cost = 0; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
