From 7c19d7f498375cd3ee6b676226c1a9b627298c5f Mon Sep 17 00:00:00 2001
From: jcoleman <jtc331@gmail.com>
Date: Sun, 2 Jul 2023 15:23:49 -0400
Subject: [PATCH v11 2/2] Parallelize correlated subqueries

When params are provided at the current query level (i.e., are generated
within a single worker and not shared across workers) we can safely
execute these in parallel.

We accomplish this by tracking the PARAM_EXEC params that are needed for
a rel to be safely executed in parallel and only inserting a Gather node
if that set is empty.
---
 doc/src/sgml/parallel.sgml                    |   3 +-
 src/backend/optimizer/path/allpaths.c         |  24 +++-
 src/backend/optimizer/util/clauses.c          |  73 +++++++++--
 src/backend/optimizer/util/relnode.c          |   1 +
 src/include/nodes/pathnodes.h                 |   6 +
 src/include/optimizer/clauses.h               |   1 +
 .../regress/expected/incremental_sort.out     |  41 +++----
 src/test/regress/expected/partition_prune.out | 104 ++++++++--------
 src/test/regress/expected/select_parallel.out | 113 ++++++++++--------
 9 files changed, 227 insertions(+), 139 deletions(-)

diff --git a/doc/src/sgml/parallel.sgml b/doc/src/sgml/parallel.sgml
index 5acc9537d6..fd32572ec8 100644
--- a/doc/src/sgml/parallel.sgml
+++ b/doc/src/sgml/parallel.sgml
@@ -518,7 +518,8 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%';
 
     <listitem>
       <para>
-        Plan nodes that reference a correlated <literal>SubPlan</literal>.
+        Plan nodes that reference a correlated <literal>SubPlan</literal> where
+        the result is shared between workers.
       </para>
     </listitem>
   </itemizedlist>
diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index 9bdc70c702..cebad18319 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -632,7 +632,7 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel,
 
 				if (proparallel != PROPARALLEL_SAFE)
 					return;
-				if (!is_parallel_safe(root, (Node *) rte->tablesample->args))
+				if (!is_parallel_safe_with_params(root, (Node *) rte->tablesample->args, &rel->params_req_for_parallel))
 					return;
 			}
 
@@ -698,7 +698,7 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel,
 
 		case RTE_FUNCTION:
 			/* Check for parallel-restricted functions. */
-			if (!is_parallel_safe(root, (Node *) rte->functions))
+			if (!is_parallel_safe_with_params(root, (Node *) rte->functions, &rel->params_req_for_parallel))
 				return;
 			break;
 
@@ -708,7 +708,7 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel,
 
 		case RTE_VALUES:
 			/* Check for parallel-restricted functions. */
-			if (!is_parallel_safe(root, (Node *) rte->values_lists))
+			if (!is_parallel_safe_with_params(root, (Node *) rte->values_lists, &rel->params_req_for_parallel))
 				return;
 			break;
 
@@ -745,14 +745,14 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel,
 	 * outer join clauses work correctly.  It would likely break equivalence
 	 * classes, too.
 	 */
-	if (!is_parallel_safe(root, (Node *) rel->baserestrictinfo))
+	if (!is_parallel_safe_with_params(root, (Node *) rel->baserestrictinfo, &rel->params_req_for_parallel))
 		return;
 
 	/*
 	 * Likewise, if the relation's outputs are not parallel-safe, give up.
 	 * (Usually, they're just Vars, but sometimes they're not.)
 	 */
-	if (!is_parallel_safe(root, (Node *) rel->reltarget->exprs))
+	if (!is_parallel_safe_with_params(root, (Node *) rel->reltarget->exprs, &rel->params_req_for_parallel))
 		return;
 
 	/* We have a winner. */
@@ -3069,6 +3069,13 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows)
 	if (rel->partial_pathlist == NIL)
 		return;
 
+	/*
+	 * Wait to insert Gather nodes until all PARAM_EXEC params are provided
+	 * within the current rel since we can't pass them to workers.
+	 */
+	if (!bms_is_empty(rel->params_req_for_parallel))
+		return;
+
 	/* Should we override the rel's rowcount estimate? */
 	if (override_rows)
 		rowsp = &rows;
@@ -3207,6 +3214,13 @@ generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_r
 	if (rel->partial_pathlist == NIL)
 		return;
 
+	/*
+	 * Wait to insert Gather nodes until all PARAM_EXEC params are provided
+	 * within the current rel since we can't pass them to workers.
+	 */
+	if (!bms_is_empty(rel->params_req_for_parallel))
+		return;
+
 	/* Should we override the rel's rowcount estimate? */
 	if (override_rows)
 		rowsp = &rows;
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index 7f453b04f8..a82a7835c4 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -91,6 +91,8 @@ typedef struct
 {
 	char		max_hazard;		/* worst proparallel hazard found so far */
 	char		max_interesting;	/* worst proparallel hazard of interest */
+	bool		check_params;
+	Bitmapset **required_params;
 	List	   *safe_param_ids; /* PARAM_EXEC Param IDs to treat as safe */
 } max_parallel_hazard_context;
 
@@ -654,6 +656,7 @@ max_parallel_hazard(Query *parse)
 
 	context.max_hazard = PROPARALLEL_SAFE;
 	context.max_interesting = PROPARALLEL_UNSAFE;
+	context.check_params = true;
 	context.safe_param_ids = NIL;
 	(void) max_parallel_hazard_walker((Node *) parse, &context);
 	return context.max_hazard;
@@ -670,8 +673,6 @@ bool
 is_parallel_safe(PlannerInfo *root, Node *node)
 {
 	max_parallel_hazard_context context;
-	PlannerInfo *proot;
-	ListCell   *l;
 
 	/*
 	 * Even if the original querytree contained nothing unsafe, we need to
@@ -685,6 +686,43 @@ is_parallel_safe(PlannerInfo *root, Node *node)
 	/* Else use max_parallel_hazard's search logic, but stop on RESTRICTED */
 	context.max_hazard = PROPARALLEL_SAFE;
 	context.max_interesting = PROPARALLEL_RESTRICTED;
+	context.check_params = false;
+	context.required_params = NULL;
+
+	return !max_parallel_hazard_walker(node, &context);
+}
+
+/*
+ * is_parallel_safe_with_params
+ *		As above, but additionally tracking what PARAM_EXEC params required to
+ *		be provided within a worker for a gather to be inserted at this level
+ *		of the query. Those required params are passed to the caller through
+ *		the required_params argument.
+ *
+ *		Note: required_params is only valid if node is otherwise parallel safe.
+ */
+bool
+is_parallel_safe_with_params(PlannerInfo *root, Node *node, Bitmapset **required_params)
+{
+	max_parallel_hazard_context context;
+	PlannerInfo *proot;
+	ListCell   *l;
+
+	/*
+	 * Even if the original querytree contained nothing unsafe, we need to
+	 * search the expression if we have generated any PARAM_EXEC Params while
+	 * planning, because those will have to be provided for the expression to
+	 * remain parallel safe and there might be one in this expression.  But
+	 * otherwise we don't need to look.
+	 */
+	if (root->glob->maxParallelHazard == PROPARALLEL_SAFE &&
+		root->glob->paramExecTypes == NIL)
+		return true;
+	/* Else use max_parallel_hazard's search logic, but stop on RESTRICTED */
+	context.max_hazard = PROPARALLEL_SAFE;
+	context.max_interesting = PROPARALLEL_RESTRICTED;
+	context.check_params = false;
+	context.required_params = required_params;
 	context.safe_param_ids = NIL;
 
 	/*
@@ -824,13 +862,22 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
 		if (!subplan->parallel_safe &&
 			max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context))
 			return true;
-		save_safe_param_ids = context->safe_param_ids;
-		context->safe_param_ids = list_concat_copy(context->safe_param_ids,
-												   subplan->paramIds);
+
+		if (context->check_params || context->required_params != NULL)
+		{
+			save_safe_param_ids = context->safe_param_ids;
+			context->safe_param_ids = list_concat_copy(context->safe_param_ids,
+													   subplan->paramIds);
+		}
 		if (max_parallel_hazard_walker(subplan->testexpr, context))
 			return true;		/* no need to restore safe_param_ids */
-		list_free(context->safe_param_ids);
-		context->safe_param_ids = save_safe_param_ids;
+
+		if (context->check_params || context->required_params != NULL)
+		{
+			list_free(context->safe_param_ids);
+			context->safe_param_ids = save_safe_param_ids;
+		}
+
 		/* we must also check args, but no special Param treatment there */
 		if (max_parallel_hazard_walker((Node *) subplan->args, context))
 			return true;
@@ -849,14 +896,18 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
 	{
 		Param	   *param = (Param *) node;
 
-		if (param->paramkind == PARAM_EXTERN)
+		if (param->paramkind != PARAM_EXEC || !(context->check_params || context->required_params != NULL))
 			return false;
 
-		if (param->paramkind != PARAM_EXEC ||
-			!list_member_int(context->safe_param_ids, param->paramid))
+		if (!list_member_int(context->safe_param_ids, param->paramid))
 		{
 			if (max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context))
-				return true;
+			{
+				if (context->required_params != NULL)
+					*context->required_params = bms_add_member(*context->required_params, param->paramid);
+				if (context->check_params)
+					return true;
+			}
 		}
 		return false;			/* nothing to recurse to */
 	}
diff --git a/src/backend/optimizer/util/relnode.c b/src/backend/optimizer/util/relnode.c
index 15e3910b79..2e12a61ceb 100644
--- a/src/backend/optimizer/util/relnode.c
+++ b/src/backend/optimizer/util/relnode.c
@@ -687,6 +687,7 @@ build_join_rel(PlannerInfo *root,
 	joinrel->consider_startup = (root->tuple_fraction > 0);
 	joinrel->consider_param_startup = false;
 	joinrel->consider_parallel = false;
+	joinrel->params_req_for_parallel = bms_union(outer_rel->params_req_for_parallel, inner_rel->params_req_for_parallel);
 	joinrel->reltarget = create_empty_pathtarget();
 	joinrel->pathlist = NIL;
 	joinrel->ppilist = NIL;
diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h
index c17b53f7ad..4966010fd0 100644
--- a/src/include/nodes/pathnodes.h
+++ b/src/include/nodes/pathnodes.h
@@ -871,6 +871,12 @@ typedef struct RelOptInfo
 	/* consider parallel paths? */
 	bool		consider_parallel;
 
+	/*
+	 * Params, if any, required to be provided when consider_parallel is true.
+	 * Note: if consider_parallel is false then this is not meaningful.
+	 */
+	Bitmapset	*params_req_for_parallel;
+
 	/*
 	 * default result targetlist for Paths scanning this relation; list of
 	 * Vars/Exprs, cost, width
diff --git a/src/include/optimizer/clauses.h b/src/include/optimizer/clauses.h
index cbe0607e85..31cf484d0f 100644
--- a/src/include/optimizer/clauses.h
+++ b/src/include/optimizer/clauses.h
@@ -34,6 +34,7 @@ extern bool contain_subplans(Node *clause);
 
 extern char max_parallel_hazard(Query *parse);
 extern bool is_parallel_safe(PlannerInfo *root, Node *node);
+extern bool is_parallel_safe_with_params(PlannerInfo *root, Node *node, Bitmapset **required_params);
 extern bool contain_nonstrict_functions(Node *clause);
 extern bool contain_exec_param(Node *clause, List *param_ids);
 extern bool contain_leaked_vars(Node *clause);
diff --git a/src/test/regress/expected/incremental_sort.out b/src/test/regress/expected/incremental_sort.out
index 0c3433f8e5..2b52b0ca3c 100644
--- a/src/test/regress/expected/incremental_sort.out
+++ b/src/test/regress/expected/incremental_sort.out
@@ -1597,20 +1597,21 @@ explain (costs off) select distinct
   unique1,
   (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1)
 from tenk1 t, generate_series(1, 1000);
-                                   QUERY PLAN                                    
----------------------------------------------------------------------------------
+                                      QUERY PLAN                                       
+---------------------------------------------------------------------------------------
  Unique
-   ->  Sort
-         Sort Key: t.unique1, ((SubPlan 1))
-         ->  Gather
-               Workers Planned: 2
-               ->  Nested Loop
-                     ->  Parallel Index Only Scan using tenk1_unique1 on tenk1 t
-                     ->  Function Scan on generate_series
-               SubPlan 1
-                 ->  Index Only Scan using tenk1_unique1 on tenk1
-                       Index Cond: (unique1 = t.unique1)
-(11 rows)
+   ->  Gather Merge
+         Workers Planned: 2
+         ->  Unique
+               ->  Sort
+                     Sort Key: t.unique1, ((SubPlan 1))
+                     ->  Nested Loop
+                           ->  Parallel Index Only Scan using tenk1_unique1 on tenk1 t
+                           ->  Function Scan on generate_series
+                           SubPlan 1
+                             ->  Index Only Scan using tenk1_unique1 on tenk1
+                                   Index Cond: (unique1 = t.unique1)
+(12 rows)
 
 explain (costs off) select
   unique1,
@@ -1619,16 +1620,16 @@ from tenk1 t, generate_series(1, 1000)
 order by 1, 2;
                                 QUERY PLAN                                 
 ---------------------------------------------------------------------------
- Sort
-   Sort Key: t.unique1, ((SubPlan 1))
-   ->  Gather
-         Workers Planned: 2
+ Gather Merge
+   Workers Planned: 2
+   ->  Sort
+         Sort Key: t.unique1, ((SubPlan 1))
          ->  Nested Loop
                ->  Parallel Index Only Scan using tenk1_unique1 on tenk1 t
                ->  Function Scan on generate_series
-         SubPlan 1
-           ->  Index Only Scan using tenk1_unique1 on tenk1
-                 Index Cond: (unique1 = t.unique1)
+               SubPlan 1
+                 ->  Index Only Scan using tenk1_unique1 on tenk1
+                       Index Cond: (unique1 = t.unique1)
 (10 rows)
 
 -- Parallel sort but with expression not available until the upper rel.
diff --git a/src/test/regress/expected/partition_prune.out b/src/test/regress/expected/partition_prune.out
index 2abf759385..4566260fc4 100644
--- a/src/test/regress/expected/partition_prune.out
+++ b/src/test/regress/expected/partition_prune.out
@@ -1489,60 +1489,64 @@ EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM part p(x) ORDER BY x;
 --
 -- pruning won't work for mc3p, because some keys are Params
 explain (costs off) select * from mc2p t1, lateral (select count(*) from mc3p t2 where t2.a = t1.b and abs(t2.b) = 1 and t2.c = 1) s where t1.a = 1;
-                              QUERY PLAN                               
------------------------------------------------------------------------
- Nested Loop
-   ->  Append
-         ->  Seq Scan on mc2p1 t1_1
-               Filter: (a = 1)
-         ->  Seq Scan on mc2p2 t1_2
-               Filter: (a = 1)
-         ->  Seq Scan on mc2p_default t1_3
-               Filter: (a = 1)
-   ->  Aggregate
-         ->  Append
-               ->  Seq Scan on mc3p0 t2_1
-                     Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p1 t2_2
-                     Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p2 t2_3
-                     Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p3 t2_4
-                     Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p4 t2_5
-                     Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p5 t2_6
-                     Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p6 t2_7
-                     Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p7 t2_8
-                     Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p_default t2_9
-                     Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
-(28 rows)
+                                 QUERY PLAN                                  
+-----------------------------------------------------------------------------
+ Gather
+   Workers Planned: 2
+   ->  Nested Loop
+         ->  Parallel Append
+               ->  Parallel Seq Scan on mc2p1 t1_1
+                     Filter: (a = 1)
+               ->  Parallel Seq Scan on mc2p2 t1_2
+                     Filter: (a = 1)
+               ->  Parallel Seq Scan on mc2p_default t1_3
+                     Filter: (a = 1)
+         ->  Aggregate
+               ->  Append
+                     ->  Seq Scan on mc3p0 t2_1
+                           Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p1 t2_2
+                           Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p2 t2_3
+                           Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p3 t2_4
+                           Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p4 t2_5
+                           Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p5 t2_6
+                           Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p6 t2_7
+                           Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p7 t2_8
+                           Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p_default t2_9
+                           Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
+(30 rows)
 
 -- pruning should work fine, because values for a prefix of keys (a, b) are
 -- available
 explain (costs off) select * from mc2p t1, lateral (select count(*) from mc3p t2 where t2.c = t1.b and abs(t2.b) = 1 and t2.a = 1) s where t1.a = 1;
-                              QUERY PLAN                               
------------------------------------------------------------------------
- Nested Loop
-   ->  Append
-         ->  Seq Scan on mc2p1 t1_1
-               Filter: (a = 1)
-         ->  Seq Scan on mc2p2 t1_2
-               Filter: (a = 1)
-         ->  Seq Scan on mc2p_default t1_3
-               Filter: (a = 1)
-   ->  Aggregate
-         ->  Append
-               ->  Seq Scan on mc3p0 t2_1
-                     Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p1 t2_2
-                     Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p_default t2_3
-                     Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1))
-(16 rows)
+                                 QUERY PLAN                                  
+-----------------------------------------------------------------------------
+ Gather
+   Workers Planned: 2
+   ->  Nested Loop
+         ->  Parallel Append
+               ->  Parallel Seq Scan on mc2p1 t1_1
+                     Filter: (a = 1)
+               ->  Parallel Seq Scan on mc2p2 t1_2
+                     Filter: (a = 1)
+               ->  Parallel Seq Scan on mc2p_default t1_3
+                     Filter: (a = 1)
+         ->  Aggregate
+               ->  Append
+                     ->  Seq Scan on mc3p0 t2_1
+                           Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p1 t2_2
+                           Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p_default t2_3
+                           Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1))
+(18 rows)
 
 -- also here, because values for all keys are provided
 explain (costs off) select * from mc2p t1, lateral (select count(*) from mc3p t2 where t2.a = 1 and abs(t2.b) = 1 and t2.c = 1) s where t1.a = 1;
diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out
index 0ddcf8e0b1..e422854574 100644
--- a/src/test/regress/expected/select_parallel.out
+++ b/src/test/regress/expected/select_parallel.out
@@ -320,19 +320,19 @@ explain (costs off, verbose) select
                                  QUERY PLAN                                 
 ----------------------------------------------------------------------------
  Gather
-   Output: (SubPlan 1)
+   Output: ((SubPlan 1))
    Workers Planned: 4
    ->  Nested Loop
-         Output: t.unique1
+         Output: (SubPlan 1)
          ->  Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t
                Output: t.unique1
          ->  Function Scan on pg_catalog.generate_series
                Output: generate_series.generate_series
                Function Call: generate_series(1, 10)
-   SubPlan 1
-     ->  Index Only Scan using tenk1_unique1 on public.tenk1
-           Output: t.unique1
-           Index Cond: (tenk1.unique1 = t.unique1)
+         SubPlan 1
+           ->  Index Only Scan using tenk1_unique1 on public.tenk1
+                 Output: t.unique1
+                 Index Cond: (tenk1.unique1 = t.unique1)
 (14 rows)
 
 explain (costs off, verbose) select
@@ -341,63 +341,69 @@ explain (costs off, verbose) select
                               QUERY PLAN                              
 ----------------------------------------------------------------------
  Gather
-   Output: (SubPlan 1)
+   Output: ((SubPlan 1))
    Workers Planned: 4
    ->  Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t
-         Output: t.unique1
-   SubPlan 1
-     ->  Index Only Scan using tenk1_unique1 on public.tenk1
-           Output: t.unique1
-           Index Cond: (tenk1.unique1 = t.unique1)
+         Output: (SubPlan 1)
+         SubPlan 1
+           ->  Index Only Scan using tenk1_unique1 on public.tenk1
+                 Output: t.unique1
+                 Index Cond: (tenk1.unique1 = t.unique1)
 (9 rows)
 
 explain (costs off, verbose) select
   (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1)
   from tenk1 t
   limit 1;
-                            QUERY PLAN                             
--------------------------------------------------------------------
+                                 QUERY PLAN                                 
+----------------------------------------------------------------------------
  Limit
    Output: ((SubPlan 1))
-   ->  Seq Scan on public.tenk1 t
-         Output: (SubPlan 1)
-         SubPlan 1
-           ->  Index Only Scan using tenk1_unique1 on public.tenk1
-                 Output: t.unique1
-                 Index Cond: (tenk1.unique1 = t.unique1)
-(8 rows)
+   ->  Gather
+         Output: ((SubPlan 1))
+         Workers Planned: 4
+         ->  Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t
+               Output: (SubPlan 1)
+               SubPlan 1
+                 ->  Index Only Scan using tenk1_unique1 on public.tenk1
+                       Output: t.unique1
+                       Index Cond: (tenk1.unique1 = t.unique1)
+(11 rows)
 
 explain (costs off, verbose) select t.unique1
   from tenk1 t
   where t.unique1 = (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1);
-                         QUERY PLAN                          
--------------------------------------------------------------
- Seq Scan on public.tenk1 t
+                              QUERY PLAN                              
+----------------------------------------------------------------------
+ Gather
    Output: t.unique1
-   Filter: (t.unique1 = (SubPlan 1))
-   SubPlan 1
-     ->  Index Only Scan using tenk1_unique1 on public.tenk1
-           Output: t.unique1
-           Index Cond: (tenk1.unique1 = t.unique1)
-(7 rows)
+   Workers Planned: 4
+   ->  Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t
+         Output: t.unique1
+         Filter: (t.unique1 = (SubPlan 1))
+         SubPlan 1
+           ->  Index Only Scan using tenk1_unique1 on public.tenk1
+                 Output: t.unique1
+                 Index Cond: (tenk1.unique1 = t.unique1)
+(10 rows)
 
 explain (costs off, verbose) select *
   from tenk1 t
   order by (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1);
-                                                                                             QUERY PLAN                                                                                             
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
- Sort
+                                                                                                QUERY PLAN                                                                                                
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Gather Merge
    Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4, ((SubPlan 1))
-   Sort Key: ((SubPlan 1))
-   ->  Gather
-         Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4, (SubPlan 1)
-         Workers Planned: 4
+   Workers Planned: 4
+   ->  Sort
+         Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4, ((SubPlan 1))
+         Sort Key: ((SubPlan 1))
          ->  Parallel Seq Scan on public.tenk1 t
-               Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4
-         SubPlan 1
-           ->  Index Only Scan using tenk1_unique1 on public.tenk1
-                 Output: t.unique1
-                 Index Cond: (tenk1.unique1 = t.unique1)
+               Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4, (SubPlan 1)
+               SubPlan 1
+                 ->  Index Only Scan using tenk1_unique1 on public.tenk1
+                       Output: t.unique1
+                       Index Cond: (tenk1.unique1 = t.unique1)
 (12 rows)
 
 -- test subplan in join/lateral join
@@ -409,14 +415,14 @@ explain (costs off, verbose, timing off) select t.unique1, l.*
                               QUERY PLAN                              
 ----------------------------------------------------------------------
  Gather
-   Output: t.unique1, (SubPlan 1)
+   Output: t.unique1, ((SubPlan 1))
    Workers Planned: 4
    ->  Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t
-         Output: t.unique1
-   SubPlan 1
-     ->  Index Only Scan using tenk1_unique1 on public.tenk1
-           Output: t.unique1
-           Index Cond: (tenk1.unique1 = t.unique1)
+         Output: t.unique1, (SubPlan 1)
+         SubPlan 1
+           ->  Index Only Scan using tenk1_unique1 on public.tenk1
+                 Output: t.unique1
+                 Index Cond: (tenk1.unique1 = t.unique1)
 (9 rows)
 
 -- can't put a gather at the top of a subplan that takes a param
@@ -1349,9 +1355,12 @@ SELECT 1 FROM tenk1_vw_sec
          Workers Planned: 4
          ->  Parallel Index Only Scan using tenk1_unique1 on tenk1
    SubPlan 1
-     ->  Aggregate
-           ->  Seq Scan on int4_tbl
-                 Filter: (f1 < tenk1_vw_sec.unique1)
-(9 rows)
+     ->  Finalize Aggregate
+           ->  Gather
+                 Workers Planned: 1
+                 ->  Partial Aggregate
+                       ->  Parallel Seq Scan on int4_tbl
+                             Filter: (f1 < tenk1_vw_sec.unique1)
+(12 rows)
 
 rollback;
-- 
2.39.2 (Apple Git-143)

