This is an automated email from the ASF dual-hosted git repository.
avamingli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git
The following commit(s) were added to refs/heads/main by this push:
new f61fbd94438 Give UNION ALL more opportunities for parallel plans in
MPP.
f61fbd94438 is described below
commit f61fbd94438099e9f091494a2e24af37132c95c1
Author: Zhang Mingli <[email protected]>
AuthorDate: Wed Aug 13 22:58:58 2025 +0800
Give UNION ALL more opportunities for parallel plans in MPP.
Previously in CBDB, we had to disable parallel execution for UNION ALL
queries containing Motion nodes due to a subtle but critical correctness
issue.
The problem occurred when Parallel Append workers marked subnodes as
completed, causing other workers to skip them. While normally harmless,
this became critical in MPP databases where Motion nodes are ubiquitous.
This limitation forced us to disable parallel plans for most UNION ALL
queries involving distributed tables, missing significant optimization
opportunities.
As a result, we fell back to serial execution:
explain(costs off) select b, count(*) from t1 group by b union all
select b, count(*) from t2 group by b;
QUERY PLAN
------------------------------------------------------------------
Gather Motion 3:1 (slice1; segments: 3)
-> Append
-> HashAggregate
Group Key: t1.b
-> Redistribute Motion 3:3 (slice2; segments: 3)
Hash Key: t1.b
-> Seq Scan on t1
-> HashAggregate
Group Key: t2.b
-> Redistribute Motion 3:3 (slice3; segments: 3)
Hash Key: t2.b
-> Seq Scan on t2
Optimizer: Postgres query optimizer
(13 rows)
The commit makes plan parallel by first attempting a parallel-aware
Append when it's safe to do so, but crucially, we now have a robust
fallback path: when Motion hazards are detected, we switch to using a
parallel-oblivious Append. This works because while Parallel Append
might skip slices containing Motions, regular Append doesn't have this
problem - it will reliably execute all subnodes regardless of whether
they contain Motion nodes or not. Moreover, since CBDB's Motion nodes
are designed to handle tuples individually, we don't need to worry
about coordination between workers when processing these Motion nodes.
This approach unlocks powerful new optimization opportunities, as shown
in this example where we can now execute the query with different levels
of parallelism for each subplan (2 workers for t1 and 3 workers for t2):
explain(costs off) select b, count(*) from t1 group by b union all
select b, count(*) from t2 group by b;
QUERY PLAN
------------------------------------------------------------------
Gather Motion 9:1 (slice1; segments: 9)
-> Append
-> HashAggregate
Group Key: t1.b
-> Redistribute Motion 6:9 (slice2; segments: 6)
Hash Key: t1.b
Hash Module: 3
-> Parallel Seq Scan on t1
-> HashAggregate
Group Key: t2.b
-> Redistribute Motion 9:9 (slice3; segments: 9)
Hash Key: t2.b
Hash Module: 3
-> Parallel Seq Scan on t2
Optimizer: Postgres query optimizer
(15 rows)
This change represents a significant improvement in CBDB's query
optimizer, allowing UNION ALL queries to benefit from parallel execution
even when they contain Motion nodes, while maintaining correctness and
supporting flexible parallelism configurations across different parts of
the query. The optimization is particularly valuable for complex queries
like TPC-DS tests where UNION ALL operations are common.
Authored-by: Zhang Mingli [email protected]
---
src/backend/optimizer/path/allpaths.c | 34 ++++-
src/test/regress/expected/window_parallel.out | 174 ++++++++++++++++++++++++++
src/test/regress/sql/window_parallel.sql | 46 +++++++
3 files changed, 252 insertions(+), 2 deletions(-)
diff --git a/src/backend/optimizer/path/allpaths.c
b/src/backend/optimizer/path/allpaths.c
index 4832f25d1ac..c02fcd4ea73 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -1839,8 +1839,38 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo
*rel,
* estimate.
*/
partial_rows = appendpath->path.rows;
- /* Add the path if subpath has not Motion.*/
- if (appendpath->path.parallel_safe &&
appendpath->path.motionHazard == false)
+
+ if (enable_parallel_append)
+ {
+ /* Add the path if subpath didn't encounter
motion hazard.*/
+ if (appendpath->path.parallel_safe &&
(appendpath->path.motionHazard == false))
+ add_partial_path(rel, (Path
*)appendpath);
+ else
+ {
+ /*
+ * CBDB_PARALLEL:
+ * When a parallel-aware Append is
dropped due to motion hazard,
+ * we attempt a second pass using
parallel-oblivious Append.
+ *
+ * This approach is feasible in CBDB
because:
+ * 1. All Motions in a parallel plan
handle tuples individually
+ * 2. Parallel Append might miss
executing slices containing Motions,
+ * whereas regular Append does not
have this problem
+ *
+ * This behavior is conceptually
similar to UPSTREAM's Append node
+ * with partial paths implementation.
+ */
+ appendpath = create_append_path(root,
rel, NIL, partial_subpaths,
+
NIL, NULL, parallel_workers,
+
false /*enable_parallel_append*/,
+
-1);
+ partial_rows = appendpath->path.rows;
+
+ if (appendpath->path.parallel_safe)
+ add_partial_path(rel, (Path
*)appendpath);
+ }
+ }
+ else if (appendpath->path.parallel_safe)
add_partial_path(rel, (Path *)appendpath);
}
}
diff --git a/src/test/regress/expected/window_parallel.out
b/src/test/regress/expected/window_parallel.out
index c9e226b7355..55fd0e820e0 100644
--- a/src/test/regress/expected/window_parallel.out
+++ b/src/test/regress/expected/window_parallel.out
@@ -861,6 +861,180 @@ select sum(salary) over (order by enroll_date range
between '1 year'::interval p
--
-- End of test of Parallel process of Window Functions.
--
+--
+-- Test Parallel UNION ALL
+--
+create table t1(a int, b int) with(parallel_workers=2);
+NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a'
as the Apache Cloudberry data distribution key for this table.
+HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make
sure column(s) chosen are the optimal data distribution key to minimize skew.
+create table t2(a int, b int) with(parallel_workers=2);
+NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a'
as the Apache Cloudberry data distribution key for this table.
+HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make
sure column(s) chosen are the optimal data distribution key to minimize skew.
+insert into t1 select i, i from generate_series(1, 10000) i;
+insert into t1 select i, i from generate_series(1, 10000) i;
+analyze t1;
+analyze t2;
+begin;
+set local enable_parallel = on;
+set local enable_parallel_append = on;
+set local min_parallel_table_scan_size = 0;
+-- If parallel-aware append encounters a motion hazard, fall back to
parallel-oblivious append.
+explain(costs off, verbose)
+select b, count(*) from t1 group by b union all select b, count(*) from t2
group by b;
+ QUERY PLAN
+------------------------------------------------------------------------------------------------------------------------
+ Gather Motion 6:1 (slice1; segments: 6)
+ Output: t1.b, (count(*))
+ -> Append
+ -> HashAggregate
+ Output: t1.b, count(*)
+ Group Key: t1.b
+ -> Redistribute Motion 6:6 (slice2; segments: 6)
+ Output: t1.b
+ Hash Key: t1.b
+ Hash Module: 3
+ -> Parallel Seq Scan on window_parallel.t1
+ Output: t1.b
+ -> HashAggregate
+ Output: t2.b, count(*)
+ Group Key: t2.b
+ -> Redistribute Motion 6:6 (slice3; segments: 6)
+ Output: t2.b
+ Hash Key: t2.b
+ Hash Module: 3
+ -> Parallel Seq Scan on window_parallel.t2
+ Output: t2.b
+ Settings: enable_parallel = 'on', enable_parallel_append = 'on',
min_parallel_table_scan_size = '0', optimizer = 'off'
+ Optimizer: Postgres query optimizer
+(23 rows)
+
+set local enable_parallel_append = off;
+-- Naturally, use parallel-oblivious append directly when parallel-aware mode
is disabled.
+explain(costs off, verbose)
+select b, count(*) from t1 group by b union all select b, count(*) from t2
group by b;
+ QUERY PLAN
+-------------------------------------------------------------------------------------------------------------------------
+ Gather Motion 6:1 (slice1; segments: 6)
+ Output: t1.b, (count(*))
+ -> Append
+ -> HashAggregate
+ Output: t1.b, count(*)
+ Group Key: t1.b
+ -> Redistribute Motion 6:6 (slice2; segments: 6)
+ Output: t1.b
+ Hash Key: t1.b
+ Hash Module: 3
+ -> Parallel Seq Scan on window_parallel.t1
+ Output: t1.b
+ -> HashAggregate
+ Output: t2.b, count(*)
+ Group Key: t2.b
+ -> Redistribute Motion 6:6 (slice3; segments: 6)
+ Output: t2.b
+ Hash Key: t2.b
+ Hash Module: 3
+ -> Parallel Seq Scan on window_parallel.t2
+ Output: t2.b
+ Settings: enable_parallel = 'on', enable_parallel_append = 'off',
min_parallel_table_scan_size = '0', optimizer = 'off'
+ Optimizer: Postgres query optimizer
+(23 rows)
+
+-- Ensure compatibility between different paths when using parallel workers
+set local enable_parallel_append = on;
+set max_parallel_workers_per_gather = 3;
+alter table t2 set(parallel_workers=3);
+explain(costs off, verbose)
+select b, count(*) from t1 group by b union all select b, count(*) from t2
group by b;
+ QUERY PLAN
+------------------------------------------------------------------------------------------------------------------------
+ Gather Motion 9:1 (slice1; segments: 9)
+ Output: t1.b, (count(*))
+ -> Append
+ -> HashAggregate
+ Output: t1.b, count(*)
+ Group Key: t1.b
+ -> Redistribute Motion 6:9 (slice2; segments: 6)
+ Output: t1.b
+ Hash Key: t1.b
+ Hash Module: 3
+ -> Parallel Seq Scan on window_parallel.t1
+ Output: t1.b
+ -> HashAggregate
+ Output: t2.b, count(*)
+ Group Key: t2.b
+ -> Redistribute Motion 9:9 (slice3; segments: 9)
+ Output: t2.b
+ Hash Key: t2.b
+ Hash Module: 3
+ -> Parallel Seq Scan on window_parallel.t2
+ Output: t2.b
+ Settings: enable_parallel = 'on', enable_parallel_append = 'on',
min_parallel_table_scan_size = '0', optimizer = 'off'
+ Optimizer: Postgres query optimizer
+(23 rows)
+
+-- Could not drive a parallel plan if no partial paths are avaliable
+alter table t2 set(parallel_workers=0);
+-- parallel-aware
+explain(costs off, verbose)
+select b, count(*) from t1 group by b union all select b, count(*) from t2
group by b;
+ QUERY PLAN
+------------------------------------------------------------------------------------------------------------------------
+ Gather Motion 3:1 (slice1; segments: 3)
+ Output: t1.b, (count(*))
+ -> Append
+ -> HashAggregate
+ Output: t1.b, count(*)
+ Group Key: t1.b
+ -> Redistribute Motion 3:3 (slice2; segments: 3)
+ Output: t1.b
+ Hash Key: t1.b
+ -> Seq Scan on window_parallel.t1
+ Output: t1.b
+ -> HashAggregate
+ Output: t2.b, count(*)
+ Group Key: t2.b
+ -> Redistribute Motion 3:3 (slice3; segments: 3)
+ Output: t2.b
+ Hash Key: t2.b
+ -> Seq Scan on window_parallel.t2
+ Output: t2.b
+ Settings: enable_parallel = 'on', enable_parallel_append = 'on',
min_parallel_table_scan_size = '0', optimizer = 'off'
+ Optimizer: Postgres query optimizer
+(21 rows)
+
+set local enable_parallel_append = off;
+-- Also applies to parallel-oblivious
+explain(costs off, verbose)
+select b, count(*) from t1 group by b union all select b, count(*) from t2
group by b;
+ QUERY PLAN
+-------------------------------------------------------------------------------------------------------------------------
+ Gather Motion 3:1 (slice1; segments: 3)
+ Output: t1.b, (count(*))
+ -> Append
+ -> HashAggregate
+ Output: t1.b, count(*)
+ Group Key: t1.b
+ -> Redistribute Motion 3:3 (slice2; segments: 3)
+ Output: t1.b
+ Hash Key: t1.b
+ -> Seq Scan on window_parallel.t1
+ Output: t1.b
+ -> HashAggregate
+ Output: t2.b, count(*)
+ Group Key: t2.b
+ -> Redistribute Motion 3:3 (slice3; segments: 3)
+ Output: t2.b
+ Hash Key: t2.b
+ -> Seq Scan on window_parallel.t2
+ Output: t2.b
+ Settings: enable_parallel = 'on', enable_parallel_append = 'off',
min_parallel_table_scan_size = '0', optimizer = 'off'
+ Optimizer: Postgres query optimizer
+(21 rows)
+
+abort;
+--
+-- End of test Parallel UNION ALL
+--
-- start_ignore
drop schema window_parallel cascade;
NOTICE: drop cascades to table empsalary
diff --git a/src/test/regress/sql/window_parallel.sql
b/src/test/regress/sql/window_parallel.sql
index 9ba4f1b9fc4..c6cd5e197cc 100644
--- a/src/test/regress/sql/window_parallel.sql
+++ b/src/test/regress/sql/window_parallel.sql
@@ -213,6 +213,52 @@ select sum(salary) over (order by enroll_date range
between '1 year'::interval p
--
-- End of test of Parallel process of Window Functions.
--
+
+--
+-- Test Parallel UNION ALL
+--
+create table t1(a int, b int) with(parallel_workers=2);
+create table t2(a int, b int) with(parallel_workers=2);
+insert into t1 select i, i from generate_series(1, 10000) i;
+insert into t1 select i, i from generate_series(1, 10000) i;
+analyze t1;
+analyze t2;
+
+begin;
+set local enable_parallel = on;
+set local enable_parallel_append = on;
+set local min_parallel_table_scan_size = 0;
+
+-- If parallel-aware append encounters a motion hazard, fall back to
parallel-oblivious append.
+explain(costs off, verbose)
+select b, count(*) from t1 group by b union all select b, count(*) from t2
group by b;
+
+set local enable_parallel_append = off;
+-- Naturally, use parallel-oblivious append directly when parallel-aware mode
is disabled.
+explain(costs off, verbose)
+select b, count(*) from t1 group by b union all select b, count(*) from t2
group by b;
+
+-- Ensure compatibility between different paths when using parallel workers
+set local enable_parallel_append = on;
+set max_parallel_workers_per_gather = 3;
+alter table t2 set(parallel_workers=3);
+explain(costs off, verbose)
+select b, count(*) from t1 group by b union all select b, count(*) from t2
group by b;
+
+-- Could not drive a parallel plan if no partial paths are avaliable
+alter table t2 set(parallel_workers=0);
+-- parallel-aware
+explain(costs off, verbose)
+select b, count(*) from t1 group by b union all select b, count(*) from t2
group by b;
+set local enable_parallel_append = off;
+-- Also applies to parallel-oblivious
+explain(costs off, verbose)
+select b, count(*) from t1 group by b union all select b, count(*) from t2
group by b;
+abort;
+
+--
+-- End of test Parallel UNION ALL
+--
-- start_ignore
drop schema window_parallel cascade;
-- end_ignore
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]