From 8b36aa8fa43bfe6a32a1dbce0623ad9243f0b303 Mon Sep 17 00:00:00 2001
From: jcoleman <jtc331@gmail.com>
Date: Wed, 18 Jan 2023 20:43:26 -0500
Subject: [PATCH v9 2/2] Parallelize correlated subqueries

When params are provided at the current query level (i.e., are generated
within a single worker and not shared across workers) we can safely
execute these in parallel.

Alternative approach using just relids subset check
---
 doc/src/sgml/parallel.sgml                    |   3 +-
 src/backend/optimizer/path/allpaths.c         |  46 +++++--
 src/backend/optimizer/path/joinpath.c         |  10 +-
 src/backend/optimizer/plan/planner.c          |   8 ++
 src/backend/optimizer/util/clauses.c          |  74 ++++------
 .../regress/expected/incremental_sort.out     |  41 +++---
 src/test/regress/expected/partition_prune.out | 104 +++++++-------
 src/test/regress/expected/select_parallel.out | 128 ++++++++++--------
 8 files changed, 222 insertions(+), 192 deletions(-)

diff --git a/doc/src/sgml/parallel.sgml b/doc/src/sgml/parallel.sgml
index 5acc9537d6..fd32572ec8 100644
--- a/doc/src/sgml/parallel.sgml
+++ b/doc/src/sgml/parallel.sgml
@@ -518,7 +518,8 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%';
 
     <listitem>
       <para>
-        Plan nodes that reference a correlated <literal>SubPlan</literal>.
+        Plan nodes that reference a correlated <literal>SubPlan</literal> where
+        the result is shared between workers.
       </para>
     </listitem>
   </itemizedlist>
diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index c2fc568dc8..a579d4c092 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -3018,11 +3018,19 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows)
 	ListCell   *lc;
 	double		rows;
 	double	   *rowsp = NULL;
+	Relids		required_outer = rel->lateral_relids;
 
 	/* If there are no partial paths, there's nothing to do here. */
 	if (rel->partial_pathlist == NIL)
 		return;
 
+	/*
+	 * Delay gather path creation until the level in the join tree where all
+	 * params used in a worker are generated within that worker.
+	 */
+	if (!bms_is_subset(required_outer, rel->relids))
+		return;
+
 	/* Should we override the rel's rowcount estimate? */
 	if (override_rows)
 		rowsp = &rows;
@@ -3033,12 +3041,17 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows)
 	 * of partial_pathlist because of the way add_partial_path works.
 	 */
 	cheapest_partial_path = linitial(rel->partial_pathlist);
-	rows =
-		cheapest_partial_path->rows * cheapest_partial_path->parallel_workers;
-	simple_gather_path = (Path *)
-		create_gather_path(root, rel, cheapest_partial_path, rel->reltarget,
-						   NULL, rowsp);
-	add_path(rel, simple_gather_path);
+
+	/* We can't pass params to workers. */
+	if (bms_is_subset(PATH_REQ_OUTER(cheapest_partial_path), rel->relids))
+	{
+		rows =
+			cheapest_partial_path->rows * cheapest_partial_path->parallel_workers;
+		simple_gather_path = (Path *)
+			create_gather_path(root, rel, cheapest_partial_path, rel->reltarget,
+							   PATH_REQ_OUTER(cheapest_partial_path), rowsp);
+		add_path(rel, simple_gather_path);
+	}
 
 	/*
 	 * For each useful ordering, we can consider an order-preserving Gather
@@ -3052,9 +3065,14 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows)
 		if (subpath->pathkeys == NIL)
 			continue;
 
+		/* We can't pass params to workers. */
+		if (!bms_is_subset(PATH_REQ_OUTER(subpath), rel->relids))
+			continue;
+
 		rows = subpath->rows * subpath->parallel_workers;
 		path = create_gather_merge_path(root, rel, subpath, rel->reltarget,
-										subpath->pathkeys, NULL, rowsp);
+										subpath->pathkeys,
+										PATH_REQ_OUTER(subpath), rowsp);
 		add_path(rel, &path->path);
 	}
 }
@@ -3156,11 +3174,19 @@ generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_r
 	double	   *rowsp = NULL;
 	List	   *useful_pathkeys_list = NIL;
 	Path	   *cheapest_partial_path = NULL;
+	Relids		required_outer = rel->lateral_relids;
 
 	/* If there are no partial paths, there's nothing to do here. */
 	if (rel->partial_pathlist == NIL)
 		return;
 
+	/*
+	 * Delay gather path creation until the level in the join tree where all
+	 * params used in a worker are generated within that worker.
+	 */
+	if (!bms_is_subset(required_outer, rel->relids))
+		return;
+
 	/* Should we override the rel's rowcount estimate? */
 	if (override_rows)
 		rowsp = &rows;
@@ -3190,6 +3216,10 @@ generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_r
 			Path	   *subpath = (Path *) lfirst(lc2);
 			GatherMergePath *path;
 
+			/* We can't pass params to workers. */
+			if (!bms_is_subset(PATH_REQ_OUTER(subpath), rel->relids))
+				continue;
+
 			is_sorted = pathkeys_count_contained_in(useful_pathkeys,
 													subpath->pathkeys,
 													&presorted_keys);
@@ -3249,7 +3279,7 @@ generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_r
 											subpath,
 											rel->reltarget,
 											subpath->pathkeys,
-											NULL,
+											PATH_REQ_OUTER(subpath),
 											rowsp);
 
 			add_path(rel, &path->path);
diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c
index d345c0437a..644b90ad1e 100644
--- a/src/backend/optimizer/path/joinpath.c
+++ b/src/backend/optimizer/path/joinpath.c
@@ -1791,10 +1791,12 @@ match_unsorted_outer(PlannerInfo *root,
 	 * Consider partial nestloop and mergejoin plan if outerrel has any
 	 * partial path and the joinrel is parallel-safe.  However, we can't
 	 * handle JOIN_UNIQUE_OUTER, because the outer path will be partial, and
-	 * therefore we won't be able to properly guarantee uniqueness.  Nor can
-	 * we handle joins needing lateral rels, since partial paths must not be
-	 * parameterized. Similarly, we can't handle JOIN_FULL and JOIN_RIGHT,
-	 * because they can produce false null extended rows.
+	 * therefore we won't be able to properly guarantee uniqueness.  Similarly,
+	 * we can't handle JOIN_FULL and JOIN_RIGHT, because they can produce false
+	 * null extended rows.
+	 *
+	 * While partial paths may now be parameterized so long as all of the params
+	 * can be generated wholly within a worker we punt on supporting that here.
 	 */
 	if (joinrel->consider_parallel &&
 		save_jointype != JOIN_UNIQUE_OUTER &&
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 05f44faf6e..fcde41d9f3 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -7344,11 +7344,16 @@ gather_grouping_paths(PlannerInfo *root, RelOptInfo *rel)
 	ListCell   *lc;
 	Path	   *cheapest_partial_path;
 
+	/* By grouping time we shouldn't have any lateral dependencies. */
+	Assert(rel->lateral_relids == NULL);
+
 	/* Try Gather for unordered paths and Gather Merge for ordered ones. */
 	generate_useful_gather_paths(root, rel, true);
 
 	/* Try cheapest partial path + explicit Sort + Gather Merge. */
 	cheapest_partial_path = linitial(rel->partial_pathlist);
+	/* By grouping time we shouldn't have any lateral dependencies. */
+	Assert(PATH_REQ_OUTER(cheapest_partial_path) == NULL);
 	if (!pathkeys_contained_in(root->group_pathkeys,
 							   cheapest_partial_path->pathkeys))
 	{
@@ -7400,6 +7405,9 @@ gather_grouping_paths(PlannerInfo *root, RelOptInfo *rel)
 		if (presorted_keys == 0)
 			continue;
 
+		/* By grouping time we shouldn't have any lateral dependencies. */
+		Assert(PATH_REQ_OUTER(path) == NULL);
+
 		path = (Path *) create_incremental_sort_path(root,
 													 rel,
 													 path,
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index aa584848cf..4a3a0489a8 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -89,7 +89,6 @@ typedef struct
 {
 	char		max_hazard;		/* worst proparallel hazard found so far */
 	char		max_interesting;	/* worst proparallel hazard of interest */
-	List	   *safe_param_ids; /* PARAM_EXEC Param IDs to treat as safe */
 } max_parallel_hazard_context;
 
 static bool contain_agg_clause_walker(Node *node, void *context);
@@ -618,7 +617,6 @@ max_parallel_hazard(Query *parse)
 
 	context.max_hazard = PROPARALLEL_SAFE;
 	context.max_interesting = PROPARALLEL_UNSAFE;
-	context.safe_param_ids = NIL;
 	(void) max_parallel_hazard_walker((Node *) parse, &context);
 	return context.max_hazard;
 }
@@ -629,43 +627,24 @@ max_parallel_hazard(Query *parse)
  *
  * root->glob->maxParallelHazard must previously have been set to the
  * result of max_parallel_hazard() on the whole query.
+ *
+ * The caller is responsible for verifying that PARAM_EXEC Params are generated
+ * at the current plan level.
  */
 bool
 is_parallel_safe(PlannerInfo *root, Node *node)
 {
 	max_parallel_hazard_context context;
-	PlannerInfo *proot;
-	ListCell   *l;
 
 	/*
-	 * Even if the original querytree contained nothing unsafe, we need to
-	 * search the expression if we have generated any PARAM_EXEC Params while
-	 * planning, because those are parallel-restricted and there might be one
-	 * in this expression.  But otherwise we don't need to look.
+	 * If we've already checked the querytree don't burn cycles doing it again.
 	 */
-	if (root->glob->maxParallelHazard == PROPARALLEL_SAFE &&
-		root->glob->paramExecTypes == NIL)
+	if (root->glob->maxParallelHazard == PROPARALLEL_SAFE)
 		return true;
+
 	/* Else use max_parallel_hazard's search logic, but stop on RESTRICTED */
 	context.max_hazard = PROPARALLEL_SAFE;
 	context.max_interesting = PROPARALLEL_RESTRICTED;
-	context.safe_param_ids = NIL;
-
-	/*
-	 * The params that refer to the same or parent query level are considered
-	 * parallel-safe.  The idea is that we compute such params at Gather or
-	 * Gather Merge node and pass their value to workers.
-	 */
-	for (proot = root; proot != NULL; proot = proot->parent_root)
-	{
-		foreach(l, proot->init_plans)
-		{
-			SubPlan    *initsubplan = (SubPlan *) lfirst(l);
-
-			context.safe_param_ids = list_concat(context.safe_param_ids,
-												 initsubplan->setParam);
-		}
-	}
 
 	return !max_parallel_hazard_walker(node, &context);
 }
@@ -775,39 +754,34 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
 	}
 
 	/*
-	 * Only parallel-safe SubPlans can be sent to workers.  Within the
-	 * testexpr of the SubPlan, Params representing the output columns of the
-	 * subplan can be treated as parallel-safe, so temporarily add their IDs
-	 * to the safe_param_ids list while examining the testexpr.
+	 * Only parallel-safe SubPlans can be sent to workers.
 	 */
 	else if (IsA(node, SubPlan))
 	{
 		SubPlan    *subplan = (SubPlan *) node;
-		List	   *save_safe_param_ids;
 
 		if (!subplan->parallel_safe &&
 			max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context))
 			return true;
-		save_safe_param_ids = context->safe_param_ids;
-		context->safe_param_ids = list_concat_copy(context->safe_param_ids,
-												   subplan->paramIds);
+
 		if (max_parallel_hazard_walker(subplan->testexpr, context))
-			return true;		/* no need to restore safe_param_ids */
-		list_free(context->safe_param_ids);
-		context->safe_param_ids = save_safe_param_ids;
-		/* we must also check args, but no special Param treatment there */
+			return true;
+
 		if (max_parallel_hazard_walker((Node *) subplan->args, context))
 			return true;
+
 		/* don't want to recurse normally, so we're done */
 		return false;
 	}
 
 	/*
-	 * We can't pass Params to workers at the moment either, so they are also
-	 * parallel-restricted, unless they are PARAM_EXTERN Params or are
-	 * PARAM_EXEC Params listed in safe_param_ids, meaning they could be
-	 * either generated within workers or can be computed by the leader and
-	 * then their value can be passed to workers.
+	 * We can't pass all types of Params to workers at the moment either.
+	 * PARAM_EXTERN Params are always allowed. PARAM_EXEC Params are parallel-
+	 * safe when they can be computed by the leader and their value passed to
+	 * workers or are generated within a worker. However we don't always know
+	 * whether a param will be generated within a worker when we are parsing a
+	 * querytree. In that case  we leave it to the consumer to verify that the
+	 * current plan level provides these params.
 	 */
 	else if (IsA(node, Param))
 	{
@@ -816,12 +790,12 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
 		if (param->paramkind == PARAM_EXTERN)
 			return false;
 
-		if (param->paramkind != PARAM_EXEC ||
-			!list_member_int(context->safe_param_ids, param->paramid))
-		{
-			if (max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context))
-				return true;
-		}
+		if (param->paramkind == PARAM_EXEC)
+			return false;
+
+		if (max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context))
+			return true;
+
 		return false;			/* nothing to recurse to */
 	}
 
diff --git a/src/test/regress/expected/incremental_sort.out b/src/test/regress/expected/incremental_sort.out
index 0c3433f8e5..2b52b0ca3c 100644
--- a/src/test/regress/expected/incremental_sort.out
+++ b/src/test/regress/expected/incremental_sort.out
@@ -1597,20 +1597,21 @@ explain (costs off) select distinct
   unique1,
   (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1)
 from tenk1 t, generate_series(1, 1000);
-                                   QUERY PLAN                                    
----------------------------------------------------------------------------------
+                                      QUERY PLAN                                       
+---------------------------------------------------------------------------------------
  Unique
-   ->  Sort
-         Sort Key: t.unique1, ((SubPlan 1))
-         ->  Gather
-               Workers Planned: 2
-               ->  Nested Loop
-                     ->  Parallel Index Only Scan using tenk1_unique1 on tenk1 t
-                     ->  Function Scan on generate_series
-               SubPlan 1
-                 ->  Index Only Scan using tenk1_unique1 on tenk1
-                       Index Cond: (unique1 = t.unique1)
-(11 rows)
+   ->  Gather Merge
+         Workers Planned: 2
+         ->  Unique
+               ->  Sort
+                     Sort Key: t.unique1, ((SubPlan 1))
+                     ->  Nested Loop
+                           ->  Parallel Index Only Scan using tenk1_unique1 on tenk1 t
+                           ->  Function Scan on generate_series
+                           SubPlan 1
+                             ->  Index Only Scan using tenk1_unique1 on tenk1
+                                   Index Cond: (unique1 = t.unique1)
+(12 rows)
 
 explain (costs off) select
   unique1,
@@ -1619,16 +1620,16 @@ from tenk1 t, generate_series(1, 1000)
 order by 1, 2;
                                 QUERY PLAN                                 
 ---------------------------------------------------------------------------
- Sort
-   Sort Key: t.unique1, ((SubPlan 1))
-   ->  Gather
-         Workers Planned: 2
+ Gather Merge
+   Workers Planned: 2
+   ->  Sort
+         Sort Key: t.unique1, ((SubPlan 1))
          ->  Nested Loop
                ->  Parallel Index Only Scan using tenk1_unique1 on tenk1 t
                ->  Function Scan on generate_series
-         SubPlan 1
-           ->  Index Only Scan using tenk1_unique1 on tenk1
-                 Index Cond: (unique1 = t.unique1)
+               SubPlan 1
+                 ->  Index Only Scan using tenk1_unique1 on tenk1
+                       Index Cond: (unique1 = t.unique1)
 (10 rows)
 
 -- Parallel sort but with expression not available until the upper rel.
diff --git a/src/test/regress/expected/partition_prune.out b/src/test/regress/expected/partition_prune.out
index 7555764c77..5c45f9c0a5 100644
--- a/src/test/regress/expected/partition_prune.out
+++ b/src/test/regress/expected/partition_prune.out
@@ -1284,60 +1284,64 @@ EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM part p(x) ORDER BY x;
 --
 -- pruning won't work for mc3p, because some keys are Params
 explain (costs off) select * from mc2p t1, lateral (select count(*) from mc3p t2 where t2.a = t1.b and abs(t2.b) = 1 and t2.c = 1) s where t1.a = 1;
-                              QUERY PLAN                               
------------------------------------------------------------------------
- Nested Loop
-   ->  Append
-         ->  Seq Scan on mc2p1 t1_1
-               Filter: (a = 1)
-         ->  Seq Scan on mc2p2 t1_2
-               Filter: (a = 1)
-         ->  Seq Scan on mc2p_default t1_3
-               Filter: (a = 1)
-   ->  Aggregate
-         ->  Append
-               ->  Seq Scan on mc3p0 t2_1
-                     Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p1 t2_2
-                     Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p2 t2_3
-                     Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p3 t2_4
-                     Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p4 t2_5
-                     Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p5 t2_6
-                     Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p6 t2_7
-                     Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p7 t2_8
-                     Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p_default t2_9
-                     Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
-(28 rows)
+                                 QUERY PLAN                                  
+-----------------------------------------------------------------------------
+ Gather
+   Workers Planned: 2
+   ->  Nested Loop
+         ->  Parallel Append
+               ->  Parallel Seq Scan on mc2p1 t1_1
+                     Filter: (a = 1)
+               ->  Parallel Seq Scan on mc2p2 t1_2
+                     Filter: (a = 1)
+               ->  Parallel Seq Scan on mc2p_default t1_3
+                     Filter: (a = 1)
+         ->  Aggregate
+               ->  Append
+                     ->  Seq Scan on mc3p0 t2_1
+                           Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p1 t2_2
+                           Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p2 t2_3
+                           Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p3 t2_4
+                           Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p4 t2_5
+                           Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p5 t2_6
+                           Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p6 t2_7
+                           Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p7 t2_8
+                           Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p_default t2_9
+                           Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
+(30 rows)
 
 -- pruning should work fine, because values for a prefix of keys (a, b) are
 -- available
 explain (costs off) select * from mc2p t1, lateral (select count(*) from mc3p t2 where t2.c = t1.b and abs(t2.b) = 1 and t2.a = 1) s where t1.a = 1;
-                              QUERY PLAN                               
------------------------------------------------------------------------
- Nested Loop
-   ->  Append
-         ->  Seq Scan on mc2p1 t1_1
-               Filter: (a = 1)
-         ->  Seq Scan on mc2p2 t1_2
-               Filter: (a = 1)
-         ->  Seq Scan on mc2p_default t1_3
-               Filter: (a = 1)
-   ->  Aggregate
-         ->  Append
-               ->  Seq Scan on mc3p0 t2_1
-                     Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p1 t2_2
-                     Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p_default t2_3
-                     Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1))
-(16 rows)
+                                 QUERY PLAN                                  
+-----------------------------------------------------------------------------
+ Gather
+   Workers Planned: 2
+   ->  Nested Loop
+         ->  Parallel Append
+               ->  Parallel Seq Scan on mc2p1 t1_1
+                     Filter: (a = 1)
+               ->  Parallel Seq Scan on mc2p2 t1_2
+                     Filter: (a = 1)
+               ->  Parallel Seq Scan on mc2p_default t1_3
+                     Filter: (a = 1)
+         ->  Aggregate
+               ->  Append
+                     ->  Seq Scan on mc3p0 t2_1
+                           Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p1 t2_2
+                           Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p_default t2_3
+                           Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1))
+(18 rows)
 
 -- also here, because values for all keys are provided
 explain (costs off) select * from mc2p t1, lateral (select count(*) from mc3p t2 where t2.a = 1 and abs(t2.b) = 1 and t2.c = 1) s where t1.a = 1;
diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out
index 9b4d7dd44a..01443e2ffb 100644
--- a/src/test/regress/expected/select_parallel.out
+++ b/src/test/regress/expected/select_parallel.out
@@ -137,8 +137,8 @@ create table part_pa_test_p2 partition of part_pa_test for values from (0) to (m
 explain (costs off)
 	select (select max((select pa1.b from part_pa_test pa1 where pa1.a = pa2.a)))
 	from part_pa_test pa2;
-                          QUERY PLAN                          
---------------------------------------------------------------
+                           QUERY PLAN                           
+----------------------------------------------------------------
  Aggregate
    ->  Gather
          Workers Planned: 3
@@ -148,12 +148,14 @@ explain (costs off)
    SubPlan 2
      ->  Result
    SubPlan 1
-     ->  Append
-           ->  Seq Scan on part_pa_test_p1 pa1_1
-                 Filter: (a = pa2.a)
-           ->  Seq Scan on part_pa_test_p2 pa1_2
-                 Filter: (a = pa2.a)
-(14 rows)
+     ->  Gather
+           Workers Planned: 3
+           ->  Parallel Append
+                 ->  Parallel Seq Scan on part_pa_test_p1 pa1_1
+                       Filter: (a = pa2.a)
+                 ->  Parallel Seq Scan on part_pa_test_p2 pa1_2
+                       Filter: (a = pa2.a)
+(16 rows)
 
 drop table part_pa_test;
 -- test with leader participation disabled
@@ -320,19 +322,19 @@ explain (costs off, verbose) select
                                  QUERY PLAN                                 
 ----------------------------------------------------------------------------
  Gather
-   Output: (SubPlan 1)
+   Output: ((SubPlan 1))
    Workers Planned: 4
    ->  Nested Loop
-         Output: t.unique1
+         Output: (SubPlan 1)
          ->  Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t
                Output: t.unique1
          ->  Function Scan on pg_catalog.generate_series
                Output: generate_series.generate_series
                Function Call: generate_series(1, 10)
-   SubPlan 1
-     ->  Index Only Scan using tenk1_unique1 on public.tenk1
-           Output: t.unique1
-           Index Cond: (tenk1.unique1 = t.unique1)
+         SubPlan 1
+           ->  Index Only Scan using tenk1_unique1 on public.tenk1
+                 Output: t.unique1
+                 Index Cond: (tenk1.unique1 = t.unique1)
 (14 rows)
 
 explain (costs off, verbose) select
@@ -341,63 +343,69 @@ explain (costs off, verbose) select
                               QUERY PLAN                              
 ----------------------------------------------------------------------
  Gather
-   Output: (SubPlan 1)
+   Output: ((SubPlan 1))
    Workers Planned: 4
    ->  Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t
-         Output: t.unique1
-   SubPlan 1
-     ->  Index Only Scan using tenk1_unique1 on public.tenk1
-           Output: t.unique1
-           Index Cond: (tenk1.unique1 = t.unique1)
+         Output: (SubPlan 1)
+         SubPlan 1
+           ->  Index Only Scan using tenk1_unique1 on public.tenk1
+                 Output: t.unique1
+                 Index Cond: (tenk1.unique1 = t.unique1)
 (9 rows)
 
 explain (costs off, verbose) select
   (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1)
   from tenk1 t
   limit 1;
-                            QUERY PLAN                             
--------------------------------------------------------------------
+                                 QUERY PLAN                                 
+----------------------------------------------------------------------------
  Limit
    Output: ((SubPlan 1))
-   ->  Seq Scan on public.tenk1 t
-         Output: (SubPlan 1)
-         SubPlan 1
-           ->  Index Only Scan using tenk1_unique1 on public.tenk1
-                 Output: t.unique1
-                 Index Cond: (tenk1.unique1 = t.unique1)
-(8 rows)
+   ->  Gather
+         Output: ((SubPlan 1))
+         Workers Planned: 4
+         ->  Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t
+               Output: (SubPlan 1)
+               SubPlan 1
+                 ->  Index Only Scan using tenk1_unique1 on public.tenk1
+                       Output: t.unique1
+                       Index Cond: (tenk1.unique1 = t.unique1)
+(11 rows)
 
 explain (costs off, verbose) select t.unique1
   from tenk1 t
   where t.unique1 = (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1);
-                         QUERY PLAN                          
--------------------------------------------------------------
- Seq Scan on public.tenk1 t
+                              QUERY PLAN                              
+----------------------------------------------------------------------
+ Gather
    Output: t.unique1
-   Filter: (t.unique1 = (SubPlan 1))
-   SubPlan 1
-     ->  Index Only Scan using tenk1_unique1 on public.tenk1
-           Output: t.unique1
-           Index Cond: (tenk1.unique1 = t.unique1)
-(7 rows)
+   Workers Planned: 4
+   ->  Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t
+         Output: t.unique1
+         Filter: (t.unique1 = (SubPlan 1))
+         SubPlan 1
+           ->  Index Only Scan using tenk1_unique1 on public.tenk1
+                 Output: t.unique1
+                 Index Cond: (tenk1.unique1 = t.unique1)
+(10 rows)
 
 explain (costs off, verbose) select *
   from tenk1 t
   order by (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1);
-                                                                                             QUERY PLAN                                                                                             
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
- Sort
+                                                                                                QUERY PLAN                                                                                                
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Gather Merge
    Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4, ((SubPlan 1))
-   Sort Key: ((SubPlan 1))
-   ->  Gather
-         Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4, (SubPlan 1)
-         Workers Planned: 4
+   Workers Planned: 4
+   ->  Sort
+         Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4, ((SubPlan 1))
+         Sort Key: ((SubPlan 1))
          ->  Parallel Seq Scan on public.tenk1 t
-               Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4
-         SubPlan 1
-           ->  Index Only Scan using tenk1_unique1 on public.tenk1
-                 Output: t.unique1
-                 Index Cond: (tenk1.unique1 = t.unique1)
+               Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4, (SubPlan 1)
+               SubPlan 1
+                 ->  Index Only Scan using tenk1_unique1 on public.tenk1
+                       Output: t.unique1
+                       Index Cond: (tenk1.unique1 = t.unique1)
 (12 rows)
 
 -- test subplan in join/lateral join
@@ -409,14 +417,14 @@ explain (costs off, verbose, timing off) select t.unique1, l.*
                               QUERY PLAN                              
 ----------------------------------------------------------------------
  Gather
-   Output: t.unique1, (SubPlan 1)
+   Output: t.unique1, ((SubPlan 1))
    Workers Planned: 4
    ->  Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t
-         Output: t.unique1
-   SubPlan 1
-     ->  Index Only Scan using tenk1_unique1 on public.tenk1
-           Output: t.unique1
-           Index Cond: (tenk1.unique1 = t.unique1)
+         Output: t.unique1, (SubPlan 1)
+         SubPlan 1
+           ->  Index Only Scan using tenk1_unique1 on public.tenk1
+                 Output: t.unique1
+                 Index Cond: (tenk1.unique1 = t.unique1)
 (9 rows)
 
 -- this is not parallel-safe due to use of random() within SubLink's testexpr:
@@ -1322,8 +1330,10 @@ SELECT 1 FROM tenk1_vw_sec
          ->  Parallel Index Only Scan using tenk1_unique1 on tenk1
    SubPlan 1
      ->  Aggregate
-           ->  Seq Scan on int4_tbl
-                 Filter: (f1 < tenk1_vw_sec.unique1)
-(9 rows)
+           ->  Gather
+                 Workers Planned: 1
+                 ->  Parallel Seq Scan on int4_tbl
+                       Filter: (f1 < tenk1_vw_sec.unique1)
+(11 rows)
 
 rollback;
-- 
2.32.1 (Apple Git-133)

