Andy Fan <zhihuifan1...@163.com> writes: Hi,
After some coding with this subject, I think it is better redefining the problem and solution. Problem: -------- Supplan is common to be ineffective *AND* recently I find it is hard to work with parallel framework. e.g. create table bigt (a int, b int, c int); insert into bigt select i, i, i from generate_series(1, 1000000)i; analyze bigt; q1: select * from bigt o where b = 1 and c > (select avg(c) from bigt i where c = o.c); We get plan: QUERY PLAN ------------------------------------------- Seq Scan on bigt o Filter: ((b = 1) AND (c > (SubPlan 1))) SubPlan 1 -> Aggregate -> Seq Scan on bigt i Filter: (c = o.c) (6 rows) Here we can see there is no parallel at all. However if split the query q1 into queries q2 and q3, both of them can be parallelized. q2: explain (costs off) select * from bigt o where b = 1 and c > 2; QUERY PLAN --------------------------------------- Gather Workers Planned: 2 -> Parallel Seq Scan on bigt o Filter: ((c > 2) AND (b = 1)) (4 rows) q3: explain (costs off) select avg(c) from bigt o where c = 2; QUERY PLAN ----------------------------------------- Aggregate -> Gather Workers Planned: 2 -> Parallel Seq Scan on bigt o Filter: (c = 2) (5 rows) Analysis -------- The major reason of q1 can't be paralleled is the subplan is parameterized. the comment from add_partial_path: * We don't generate parameterized partial paths for several reasons. Most * importantly, they're not safe to execute, because there's nothing to * make sure that a parallel scan within the parameterized portion of the * plan is running with the same value in every worker at the same time. the comment from max_parallel_hazard_walker: * We can't pass Params to workers at the moment either .. unless * they are listed in safe_param_ids, meaning they could be * either generated within workers or can be computed by the leader and * then their value can be passed to workers. Solutions ---------- two foundations for this solution in my mind: 1. It is not safe to execute a partial parameterized plan with different parameter value, as what we have well done and documented. But this doesn't apply to a parameterized completed plan, in this case each worker runs a completed plan, they always generate the same result no matter it runs in parallel worker or leader. 2. The subplan never be a partial Plan. in make_subplan: best_path = get_cheapest_fractional_path(final_rel, tuple_fraction); plan = create_plan(subroot, best_path); /* And convert to SubPlan or InitPlan format. */ result = build_subplan(root, plan, best_path, subroot, plan_params, subLinkType, subLinkId, testexpr, NIL, isTopQual); get_cheapest_fractional_path never read rel->partial_pathlist. So I think it is safe to ignore the PARAM_EXEC check in max_parallel_hazard_context.safe_param_ids) for subplan. See attached patch 1. Benefit: -------- After this patch, we could get the below plan -- the correlated subplan is parallelized. explain (costs off) select * from bigt o where b = 1 and c > (select avg(c) from bigt i where c = o.c); QUERY PLAN ------------------------------------------------------ Seq Scan on bigt o Filter: ((b = 1) AND ((c)::numeric > (SubPlan 1))) SubPlan 1 -> Aggregate -> Gather Workers Planned: 2 -> Parallel Seq Scan on bigt i Filter: (c = o.c) (8 rows) Continue the test to prove the impact of this patch by removing the "Gather" in SubPlan, we could get the below plan -- scan with parallel-safe SubPlan is parallelized. create table t (a int, b int); explain (costs off) select * from bigt o where b = 1 and c > (select avg(a) from t i where b = o.c); QUERY PLAN ------------------------------------------------------------ Gather Workers Planned: 2 -> Parallel Seq Scan on bigt o Filter: ((b = 1) AND ((c)::numeric > (SubPlan 1))) SubPlan 1 -> Aggregate -> Seq Scan on t i Filter: (b = o.c) (8 rows) incremental_sort.sql provides another impacts of this patch. It is helpful for parallel sort. Query: select distinct unique1, (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1) from tenk1 t, generate_series(1, 1000); >From (master) QUERY PLAN ---------------------------------------------------------------------------------------- Unique Output: t.unique1, ((SubPlan 1)) -> Sort Output: t.unique1, ((SubPlan 1)) Sort Key: t.unique1, ((SubPlan 1)) -> Gather Output: t.unique1, (SubPlan 1) Workers Planned: 2 -> Nested Loop Output: t.unique1 -> Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t Output: t.unique1 -> Function Scan on pg_catalog.generate_series Output: generate_series.generate_series Function Call: generate_series(1, 1000) SubPlan 1 -> Index Only Scan using tenk1_unique1 on public.tenk1 Output: t.unique1 Index Cond: (tenk1.unique1 = t.unique1) (19 rows) To (patched) QUERY PLAN ---------------------------------------------------------------------------------------------- Unique Output: t.unique1, ((SubPlan 1)) -> Gather Merge * Merge gather at last * Output: t.unique1, ((SubPlan 1)) Workers Planned: 2 -> Unique Output: t.unique1, ((SubPlan 1)) -> Sort ** Sort In worker * Output: t.unique1, ((SubPlan 1)) Sort Key: t.unique1, ((SubPlan 1)) -> Nested Loop *SubPlan in Worker.** Output: t.unique1, (SubPlan 1) -> Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t Output: t.unique1 -> Function Scan on pg_catalog.generate_series Output: generate_series.generate_series Function Call: generate_series(1, 1000) SubPlan 1 -> Index Only Scan using tenk1_unique1 on public.tenk1 Output: t.unique1 Index Cond: (tenk1.unique1 = t.unique1) (21 rows) The execution time for the above query also decreased from 13351.928 ms to 4814.043 ms, by 64%. The major difference is: (1) master: correlated subquery is parallel unsafe, so it runs in leader only, and then sort. (2) patched: correlated subquery is parallel safe, so it run in worker (Nested Loop) and then *sort in parallel worker* and then run "merge gather". About the implementation, I know 2 issues at least (the state is PoC now). 1. Query.is_in_sublink should be set in parser and keep unchanged later. 2. The below comment increment_sort.sql should be changed, it is just conflicted with this patch. """ -- Parallel sort but with expression (correlated subquery) that -- is prohibited in parallel plans. """ Hope I have made myself clear, any feedback is welcome! -- Best Regards Andy Fan
>From 19ef904ab5ed0d2b2dcc5141fc894e1f33ef9ab0 Mon Sep 17 00:00:00 2001 From: Andy Fan <zhihuifan1...@163.com> Date: Wed, 2 Jul 2025 06:28:02 +0000 Subject: [PATCH v0 1/1] Revisit Subplan's parallel safety. --- src/backend/optimizer/plan/planner.c | 2 + src/backend/optimizer/plan/subselect.c | 2 + src/backend/optimizer/util/clauses.c | 32 ++++++++++----- src/include/nodes/parsenodes.h | 9 ++++ src/include/nodes/pathnodes.h | 3 ++ .../regress/expected/incremental_sort.out | 41 ++++++++++--------- src/test/regress/expected/select_parallel.out | 26 +++++++----- 7 files changed, 73 insertions(+), 42 deletions(-) diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 549aedcfa99..3a1fa618e4f 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -708,6 +708,8 @@ subquery_planner(PlannerGlobal *glob, Query *parse, PlannerInfo *parent_root, */ root->join_domains = list_make1(makeNode(JoinDomain)); + root->isSubPlan = parse->is_in_sublink; + /* * If there is a WITH list, process each WITH query and either convert it * to RTE_SUBQUERY RTE(s) or build an initplan SubPlan structure for it. diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c index e7cb3fede66..4de880de9ef 100644 --- a/src/backend/optimizer/plan/subselect.c +++ b/src/backend/optimizer/plan/subselect.c @@ -181,6 +181,8 @@ make_subplan(PlannerInfo *root, Query *orig_subquery, */ subquery = copyObject(orig_subquery); + subquery->is_in_sublink = true; + /* * If it's an EXISTS subplan, we might be able to simplify it. */ diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index 26a3e050086..63816d8a733 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -92,6 +92,7 @@ typedef struct char max_hazard; /* worst proparallel hazard found so far */ char max_interesting; /* worst proparallel hazard of interest */ List *safe_param_ids; /* PARAM_EXEC Param IDs to treat as safe */ + bool is_subplan; } max_parallel_hazard_context; static bool contain_agg_clause_walker(Node *node, void *context); @@ -770,23 +771,26 @@ is_parallel_safe(PlannerInfo *root, Node *node) context.max_hazard = PROPARALLEL_SAFE; context.max_interesting = PROPARALLEL_RESTRICTED; context.safe_param_ids = NIL; + context.is_subplan = root->isSubPlan; - /* - * The params that refer to the same or parent query level are considered - * parallel-safe. The idea is that we compute such params at Gather or - * Gather Merge node and pass their value to workers. - */ - for (proot = root; proot != NULL; proot = proot->parent_root) + if (!context.is_subplan) { - foreach(l, proot->init_plans) + /* + * The params that refer to the same or parent query level are + * considered parallel-safe. The idea is that we compute such params + * at Gather or Gather Merge node and pass their value to workers. + */ + for (proot = root; proot != NULL; proot = proot->parent_root) { - SubPlan *initsubplan = (SubPlan *) lfirst(l); + foreach(l, proot->init_plans) + { + SubPlan *initsubplan = (SubPlan *) lfirst(l); - context.safe_param_ids = list_concat(context.safe_param_ids, - initsubplan->setParam); + context.safe_param_ids = list_concat(context.safe_param_ids, + initsubplan->setParam); + } } } - return !max_parallel_hazard_walker(node, &context); } @@ -936,6 +940,12 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context) if (param->paramkind == PARAM_EXTERN) return false; + /* + * Subplan is always non partial plan, so their parameter are always + * computed by leader. + */ + if (context->is_subplan) + return false; if (param->paramkind != PARAM_EXEC || !list_member_int(context->safe_param_ids, param->paramid)) { diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index daa285ca62f..cc36dc72eb3 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -244,6 +244,15 @@ typedef struct Query /* a list of WithCheckOption's (added during rewrite) */ List *withCheckOptions pg_node_attr(query_jumble_ignore); + /* + * XXX: looks currently we don't know where a Query comes from, e.g. + * SubQuery or SubLink. Providing such information to planner might be + * helpful sometimes. For now only SubLink case is interesting and it is + * only set during make_subplan. we can do it in parser if it worths the + * trouble (there are many members in enum SubLinkType). + */ + bool is_in_sublink; + /* * The following two fields identify the portion of the source text string * containing this query. They are typically only populated in top-level diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h index 6567759595d..053ace26e74 100644 --- a/src/include/nodes/pathnodes.h +++ b/src/include/nodes/pathnodes.h @@ -583,6 +583,9 @@ struct PlannerInfo /* PartitionPruneInfos added in this query's plan. */ List *partPruneInfos; + + /* is building a subplan */ + bool isSubPlan; }; diff --git a/src/test/regress/expected/incremental_sort.out b/src/test/regress/expected/incremental_sort.out index b00219643b9..9ba232c4c67 100644 --- a/src/test/regress/expected/incremental_sort.out +++ b/src/test/regress/expected/incremental_sort.out @@ -1605,20 +1605,21 @@ explain (costs off) select distinct unique1, (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1) from tenk1 t, generate_series(1, 1000); - QUERY PLAN ---------------------------------------------------------------------------------- + QUERY PLAN +--------------------------------------------------------------------------------------- Unique - -> Sort - Sort Key: t.unique1, ((SubPlan 1)) - -> Gather - Workers Planned: 2 - -> Nested Loop - -> Parallel Index Only Scan using tenk1_unique1 on tenk1 t - -> Function Scan on generate_series - SubPlan 1 - -> Index Only Scan using tenk1_unique1 on tenk1 - Index Cond: (unique1 = t.unique1) -(11 rows) + -> Gather Merge + Workers Planned: 2 + -> Unique + -> Sort + Sort Key: t.unique1, ((SubPlan 1)) + -> Nested Loop + -> Parallel Index Only Scan using tenk1_unique1 on tenk1 t + -> Function Scan on generate_series + SubPlan 1 + -> Index Only Scan using tenk1_unique1 on tenk1 + Index Cond: (unique1 = t.unique1) +(12 rows) explain (costs off) select unique1, @@ -1627,16 +1628,16 @@ from tenk1 t, generate_series(1, 1000) order by 1, 2; QUERY PLAN --------------------------------------------------------------------------- - Sort - Sort Key: t.unique1, ((SubPlan 1)) - -> Gather - Workers Planned: 2 + Gather Merge + Workers Planned: 2 + -> Sort + Sort Key: t.unique1, ((SubPlan 1)) -> Nested Loop -> Parallel Index Only Scan using tenk1_unique1 on tenk1 t -> Function Scan on generate_series - SubPlan 1 - -> Index Only Scan using tenk1_unique1 on tenk1 - Index Cond: (unique1 = t.unique1) + SubPlan 1 + -> Index Only Scan using tenk1_unique1 on tenk1 + Index Cond: (unique1 = t.unique1) (10 rows) -- Parallel sort but with expression not available until the upper rel. diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out index 0185ef661b1..189a77a4f5f 100644 --- a/src/test/regress/expected/select_parallel.out +++ b/src/test/regress/expected/select_parallel.out @@ -148,8 +148,8 @@ create table part_pa_test_p2 partition of part_pa_test for values from (0) to (m explain (costs off) select (select max((select pa1.b from part_pa_test pa1 where pa1.a = pa2.a))) from part_pa_test pa2; - QUERY PLAN --------------------------------------------------------------- + QUERY PLAN +---------------------------------------------------------------- Aggregate -> Gather Workers Planned: 3 @@ -159,12 +159,14 @@ explain (costs off) SubPlan 2 -> Result SubPlan 1 - -> Append - -> Seq Scan on part_pa_test_p1 pa1_1 - Filter: (a = pa2.a) - -> Seq Scan on part_pa_test_p2 pa1_2 - Filter: (a = pa2.a) -(14 rows) + -> Gather + Workers Planned: 3 + -> Parallel Append + -> Parallel Seq Scan on part_pa_test_p1 pa1_1 + Filter: (a = pa2.a) + -> Parallel Seq Scan on part_pa_test_p2 pa1_2 + Filter: (a = pa2.a) +(16 rows) drop table part_pa_test; -- test with leader participation disabled @@ -1339,9 +1341,11 @@ SELECT 1 FROM tenk1_vw_sec -> Parallel Index Only Scan using tenk1_unique1 on tenk1 SubPlan 1 -> Aggregate - -> Seq Scan on int4_tbl - Filter: (f1 < tenk1_vw_sec.unique1) -(9 rows) + -> Gather + Workers Planned: 1 + -> Parallel Seq Scan on int4_tbl + Filter: (f1 < tenk1_vw_sec.unique1) +(11 rows) rollback; -- test that a newly-created session role propagates to workers. -- 2.45.1