Andy Fan <zhihuifan1...@163.com> writes:

Hi,

After some coding with this subject, I think it is better redefining
the problem and solution.

Problem:
--------

Supplan is common to be ineffective *AND* recently I find it is hard to
work with parallel framework. e.g.

create table bigt (a int, b int, c int);
insert into bigt select i, i, i from generate_series(1, 1000000)i;
analyze bigt;

q1:
select * from bigt o where b = 1
and c > (select avg(c) from bigt i where c = o.c);

We get plan:

                QUERY PLAN                 
-------------------------------------------
 Seq Scan on bigt o
   Filter: ((b = 1) AND (c > (SubPlan 1)))
   SubPlan 1
     ->  Aggregate
           ->  Seq Scan on bigt i
                 Filter: (c = o.c)
(6 rows)

Here we can see there is no parallel at all. However if split the query
q1 into queries q2 and q3, both of them can be parallelized.

q2:
explain (costs off) select * from bigt o where b = 1 and c > 2;
              QUERY PLAN               
---------------------------------------
 Gather
   Workers Planned: 2
   ->  Parallel Seq Scan on bigt o
         Filter: ((c > 2) AND (b = 1))
(4 rows)

q3:
explain (costs off) select avg(c) from bigt o where c = 2;
               QUERY PLAN                
-----------------------------------------
 Aggregate
   ->  Gather
         Workers Planned: 2
         ->  Parallel Seq Scan on bigt o
               Filter: (c = 2)
(5 rows)


Analysis
--------

The major reason of q1 can't be paralleled is the subplan is parameterized. 

the comment from add_partial_path:

 *        We don't generate parameterized partial paths for several reasons.  
Most
 *        importantly, they're not safe to execute, because there's nothing to
 *        make sure that a parallel scan within the parameterized portion of the
 *        plan is running with the same value in every worker at the same time.

the comment from max_parallel_hazard_walker:

 * We can't pass Params to workers at the moment either .. unless
 * they are listed in safe_param_ids, meaning they could be
 * either generated within workers or can be computed by the leader and
 * then their value can be passed to workers.

Solutions
----------

two foundations for this solution in my mind:

1. It is not safe to execute a partial parameterized plan with different
   parameter value, as what we have well done and documented. But this
   doesn't apply to a parameterized completed plan, in this case each
   worker runs a completed plan, they always generate the same result
   no matter it runs in parallel worker or leader.

2. The subplan never be a partial Plan. in make_subplan:

    best_path = get_cheapest_fractional_path(final_rel, tuple_fraction);

        plan = create_plan(subroot, best_path);

        /* And convert to SubPlan or InitPlan format. */
        result = build_subplan(root, plan, best_path,
                                                   subroot, plan_params,
                                                   subLinkType, subLinkId,
                                                   testexpr, NIL, isTopQual);

    get_cheapest_fractional_path never read rel->partial_pathlist.              
          

So I think it is safe to ignore the PARAM_EXEC check in
max_parallel_hazard_context.safe_param_ids) for subplan. See attached 
patch 1.  

Benefit:
--------

After this patch, we could get the below plan -- the correlated subplan
is parallelized. 

explain (costs off) select * from bigt o where b = 1
    and c > (select avg(c) from bigt i where c = o.c);
                      QUERY PLAN                      
------------------------------------------------------
 Seq Scan on bigt o
   Filter: ((b = 1) AND ((c)::numeric > (SubPlan 1)))
   SubPlan 1
     ->  Aggregate
           ->  Gather
                 Workers Planned: 2
                 ->  Parallel Seq Scan on bigt i
                       Filter: (c = o.c)
(8 rows)

Continue the test to prove the impact of this patch by removing the
"Gather" in SubPlan, we could get the below plan -- scan with
parallel-safe SubPlan is parallelized.  

create table t (a int, b int);
explain (costs off) select * from bigt o where b = 1
    and c > (select avg(a) from t i where b = o.c);
                         QUERY PLAN                         
------------------------------------------------------------
 Gather
   Workers Planned: 2
   ->  Parallel Seq Scan on bigt o
         Filter: ((b = 1) AND ((c)::numeric > (SubPlan 1)))
         SubPlan 1
           ->  Aggregate
                 ->  Seq Scan on t i
                       Filter: (b = o.c)
(8 rows)


incremental_sort.sql provides another impacts of this patch. It is
helpful for parallel sort. 

Query:

select distinct
  unique1,
  (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1)
from tenk1 t, generate_series(1, 1000);

>From (master)

                                       QUERY PLAN                               
        
----------------------------------------------------------------------------------------
 Unique
   Output: t.unique1, ((SubPlan 1))
   ->  Sort
         Output: t.unique1, ((SubPlan 1))
         Sort Key: t.unique1, ((SubPlan 1))
         ->  Gather
               Output: t.unique1, (SubPlan 1)
               Workers Planned: 2
               ->  Nested Loop
                     Output: t.unique1
                     ->  Parallel Index Only Scan using tenk1_unique1 on 
public.tenk1 t
                           Output: t.unique1
                     ->  Function Scan on pg_catalog.generate_series
                           Output: generate_series.generate_series
                           Function Call: generate_series(1, 1000)
               SubPlan 1
                 ->  Index Only Scan using tenk1_unique1 on public.tenk1
                       Output: t.unique1
                       Index Cond: (tenk1.unique1 = t.unique1)
(19 rows)

To (patched)

                                          QUERY PLAN                            
              
----------------------------------------------------------------------------------------------
 Unique
   Output: t.unique1, ((SubPlan 1))
   ->  Gather Merge  * Merge gather at last *
         Output: t.unique1, ((SubPlan 1))
         Workers Planned: 2
         ->  Unique
               Output: t.unique1, ((SubPlan 1))
               ->  Sort ** Sort In worker *
                     Output: t.unique1, ((SubPlan 1))
                     Sort Key: t.unique1, ((SubPlan 1))
                     ->  Nested Loop
                            *SubPlan in Worker.**
                           Output: t.unique1, (SubPlan 1)  
                           ->  Parallel Index Only Scan using tenk1_unique1 on 
public.tenk1 t
                                 Output: t.unique1
                           ->  Function Scan on pg_catalog.generate_series
                                 Output: generate_series.generate_series
                                 Function Call: generate_series(1, 1000)
                           SubPlan 1
                             ->  Index Only Scan using tenk1_unique1 on 
public.tenk1
                                   Output: t.unique1
                                   Index Cond: (tenk1.unique1 = t.unique1)
(21 rows)

The execution time for the above query also decreased from 13351.928 ms
to 4814.043 ms, by 64%. The major difference is:

(1) master: correlated subquery is parallel unsafe, so it runs in leader
only, and then sort.
(2) patched: correlated subquery is parallel safe, so it run in worker
(Nested Loop) and then *sort in parallel worker* and then run "merge
gather".

About the implementation, I know 2 issues at least (the state is PoC
now). 

1. Query.is_in_sublink should be set in parser and keep unchanged later.
2. The below comment increment_sort.sql should be changed, it is just
   conflicted with this patch.

   """
   -- Parallel sort but with expression (correlated subquery) that
   -- is prohibited in parallel plans.
   """

Hope I have made myself clear, any feedback is welcome!

-- 
Best Regards
Andy Fan

>From 19ef904ab5ed0d2b2dcc5141fc894e1f33ef9ab0 Mon Sep 17 00:00:00 2001
From: Andy Fan <zhihuifan1...@163.com>
Date: Wed, 2 Jul 2025 06:28:02 +0000
Subject: [PATCH v0 1/1] Revisit Subplan's parallel safety.

---
 src/backend/optimizer/plan/planner.c          |  2 +
 src/backend/optimizer/plan/subselect.c        |  2 +
 src/backend/optimizer/util/clauses.c          | 32 ++++++++++-----
 src/include/nodes/parsenodes.h                |  9 ++++
 src/include/nodes/pathnodes.h                 |  3 ++
 .../regress/expected/incremental_sort.out     | 41 ++++++++++---------
 src/test/regress/expected/select_parallel.out | 26 +++++++-----
 7 files changed, 73 insertions(+), 42 deletions(-)

diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 549aedcfa99..3a1fa618e4f 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -708,6 +708,8 @@ subquery_planner(PlannerGlobal *glob, Query *parse, PlannerInfo *parent_root,
 	 */
 	root->join_domains = list_make1(makeNode(JoinDomain));
 
+	root->isSubPlan = parse->is_in_sublink;
+
 	/*
 	 * If there is a WITH list, process each WITH query and either convert it
 	 * to RTE_SUBQUERY RTE(s) or build an initplan SubPlan structure for it.
diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c
index e7cb3fede66..4de880de9ef 100644
--- a/src/backend/optimizer/plan/subselect.c
+++ b/src/backend/optimizer/plan/subselect.c
@@ -181,6 +181,8 @@ make_subplan(PlannerInfo *root, Query *orig_subquery,
 	 */
 	subquery = copyObject(orig_subquery);
 
+	subquery->is_in_sublink = true;
+
 	/*
 	 * If it's an EXISTS subplan, we might be able to simplify it.
 	 */
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index 26a3e050086..63816d8a733 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -92,6 +92,7 @@ typedef struct
 	char		max_hazard;		/* worst proparallel hazard found so far */
 	char		max_interesting;	/* worst proparallel hazard of interest */
 	List	   *safe_param_ids; /* PARAM_EXEC Param IDs to treat as safe */
+	bool		is_subplan;
 } max_parallel_hazard_context;
 
 static bool contain_agg_clause_walker(Node *node, void *context);
@@ -770,23 +771,26 @@ is_parallel_safe(PlannerInfo *root, Node *node)
 	context.max_hazard = PROPARALLEL_SAFE;
 	context.max_interesting = PROPARALLEL_RESTRICTED;
 	context.safe_param_ids = NIL;
+	context.is_subplan = root->isSubPlan;
 
-	/*
-	 * The params that refer to the same or parent query level are considered
-	 * parallel-safe.  The idea is that we compute such params at Gather or
-	 * Gather Merge node and pass their value to workers.
-	 */
-	for (proot = root; proot != NULL; proot = proot->parent_root)
+	if (!context.is_subplan)
 	{
-		foreach(l, proot->init_plans)
+		/*
+		 * The params that refer to the same or parent query level are
+		 * considered parallel-safe.  The idea is that we compute such params
+		 * at Gather or Gather Merge node and pass their value to workers.
+		 */
+		for (proot = root; proot != NULL; proot = proot->parent_root)
 		{
-			SubPlan    *initsubplan = (SubPlan *) lfirst(l);
+			foreach(l, proot->init_plans)
+			{
+				SubPlan    *initsubplan = (SubPlan *) lfirst(l);
 
-			context.safe_param_ids = list_concat(context.safe_param_ids,
-												 initsubplan->setParam);
+				context.safe_param_ids = list_concat(context.safe_param_ids,
+													 initsubplan->setParam);
+			}
 		}
 	}
-
 	return !max_parallel_hazard_walker(node, &context);
 }
 
@@ -936,6 +940,12 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
 		if (param->paramkind == PARAM_EXTERN)
 			return false;
 
+		/*
+		 * Subplan is always non partial plan, so their parameter are always
+		 * computed by leader.
+		 */
+		if (context->is_subplan)
+			return false;
 		if (param->paramkind != PARAM_EXEC ||
 			!list_member_int(context->safe_param_ids, param->paramid))
 		{
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index daa285ca62f..cc36dc72eb3 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -244,6 +244,15 @@ typedef struct Query
 	/* a list of WithCheckOption's (added during rewrite) */
 	List	   *withCheckOptions pg_node_attr(query_jumble_ignore);
 
+	/*
+	 * XXX: looks currently we don't know where a Query comes from, e.g.
+	 * SubQuery or SubLink. Providing such information to planner might be
+	 * helpful sometimes. For now only SubLink case is interesting and it is
+	 * only set during make_subplan.  we can do it in parser if it worths the
+	 * trouble (there are many members in enum SubLinkType).
+	 */
+	bool		is_in_sublink;
+
 	/*
 	 * The following two fields identify the portion of the source text string
 	 * containing this query.  They are typically only populated in top-level
diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h
index 6567759595d..053ace26e74 100644
--- a/src/include/nodes/pathnodes.h
+++ b/src/include/nodes/pathnodes.h
@@ -583,6 +583,9 @@ struct PlannerInfo
 
 	/* PartitionPruneInfos added in this query's plan. */
 	List	   *partPruneInfos;
+
+	/* is building a subplan */
+	bool		isSubPlan;
 };
 
 
diff --git a/src/test/regress/expected/incremental_sort.out b/src/test/regress/expected/incremental_sort.out
index b00219643b9..9ba232c4c67 100644
--- a/src/test/regress/expected/incremental_sort.out
+++ b/src/test/regress/expected/incremental_sort.out
@@ -1605,20 +1605,21 @@ explain (costs off) select distinct
   unique1,
   (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1)
 from tenk1 t, generate_series(1, 1000);
-                                   QUERY PLAN                                    
----------------------------------------------------------------------------------
+                                      QUERY PLAN                                       
+---------------------------------------------------------------------------------------
  Unique
-   ->  Sort
-         Sort Key: t.unique1, ((SubPlan 1))
-         ->  Gather
-               Workers Planned: 2
-               ->  Nested Loop
-                     ->  Parallel Index Only Scan using tenk1_unique1 on tenk1 t
-                     ->  Function Scan on generate_series
-               SubPlan 1
-                 ->  Index Only Scan using tenk1_unique1 on tenk1
-                       Index Cond: (unique1 = t.unique1)
-(11 rows)
+   ->  Gather Merge
+         Workers Planned: 2
+         ->  Unique
+               ->  Sort
+                     Sort Key: t.unique1, ((SubPlan 1))
+                     ->  Nested Loop
+                           ->  Parallel Index Only Scan using tenk1_unique1 on tenk1 t
+                           ->  Function Scan on generate_series
+                           SubPlan 1
+                             ->  Index Only Scan using tenk1_unique1 on tenk1
+                                   Index Cond: (unique1 = t.unique1)
+(12 rows)
 
 explain (costs off) select
   unique1,
@@ -1627,16 +1628,16 @@ from tenk1 t, generate_series(1, 1000)
 order by 1, 2;
                                 QUERY PLAN                                 
 ---------------------------------------------------------------------------
- Sort
-   Sort Key: t.unique1, ((SubPlan 1))
-   ->  Gather
-         Workers Planned: 2
+ Gather Merge
+   Workers Planned: 2
+   ->  Sort
+         Sort Key: t.unique1, ((SubPlan 1))
          ->  Nested Loop
                ->  Parallel Index Only Scan using tenk1_unique1 on tenk1 t
                ->  Function Scan on generate_series
-         SubPlan 1
-           ->  Index Only Scan using tenk1_unique1 on tenk1
-                 Index Cond: (unique1 = t.unique1)
+               SubPlan 1
+                 ->  Index Only Scan using tenk1_unique1 on tenk1
+                       Index Cond: (unique1 = t.unique1)
 (10 rows)
 
 -- Parallel sort but with expression not available until the upper rel.
diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out
index 0185ef661b1..189a77a4f5f 100644
--- a/src/test/regress/expected/select_parallel.out
+++ b/src/test/regress/expected/select_parallel.out
@@ -148,8 +148,8 @@ create table part_pa_test_p2 partition of part_pa_test for values from (0) to (m
 explain (costs off)
 	select (select max((select pa1.b from part_pa_test pa1 where pa1.a = pa2.a)))
 	from part_pa_test pa2;
-                          QUERY PLAN                          
---------------------------------------------------------------
+                           QUERY PLAN                           
+----------------------------------------------------------------
  Aggregate
    ->  Gather
          Workers Planned: 3
@@ -159,12 +159,14 @@ explain (costs off)
    SubPlan 2
      ->  Result
    SubPlan 1
-     ->  Append
-           ->  Seq Scan on part_pa_test_p1 pa1_1
-                 Filter: (a = pa2.a)
-           ->  Seq Scan on part_pa_test_p2 pa1_2
-                 Filter: (a = pa2.a)
-(14 rows)
+     ->  Gather
+           Workers Planned: 3
+           ->  Parallel Append
+                 ->  Parallel Seq Scan on part_pa_test_p1 pa1_1
+                       Filter: (a = pa2.a)
+                 ->  Parallel Seq Scan on part_pa_test_p2 pa1_2
+                       Filter: (a = pa2.a)
+(16 rows)
 
 drop table part_pa_test;
 -- test with leader participation disabled
@@ -1339,9 +1341,11 @@ SELECT 1 FROM tenk1_vw_sec
          ->  Parallel Index Only Scan using tenk1_unique1 on tenk1
    SubPlan 1
      ->  Aggregate
-           ->  Seq Scan on int4_tbl
-                 Filter: (f1 < tenk1_vw_sec.unique1)
-(9 rows)
+           ->  Gather
+                 Workers Planned: 1
+                 ->  Parallel Seq Scan on int4_tbl
+                       Filter: (f1 < tenk1_vw_sec.unique1)
+(11 rows)
 
 rollback;
 -- test that a newly-created session role propagates to workers.
-- 
2.45.1

Reply via email to