From 9e92be221e83a72e99e0bad8c234f311714200f0 Mon Sep 17 00:00:00 2001
From: Tender Wang <tndrwang@gmail.com>
Date: Tue, 23 Apr 2024 16:36:23 +0800
Subject: [PATCH v4] Support materializing inner path on parallel outer path.

In some cases, the inner path of nestloop join may take very long
time. For example, the inner relation is a big table or a complex
subquery. Parallel outer path and materialize inner path may get
serval times the performance improvement at a small plan time cost.
---
 src/backend/optimizer/path/joinpath.c         |  21 ++++
 src/test/regress/expected/partition_join.out  | 108 ++++++++++--------
 src/test/regress/expected/select_parallel.out |  40 +++++++
 src/test/regress/sql/select_parallel.sql      |  16 +++
 4 files changed, 137 insertions(+), 48 deletions(-)

diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c
index 5be8da9e09..734b7ddc20 100644
--- a/src/backend/optimizer/path/joinpath.c
+++ b/src/backend/optimizer/path/joinpath.c
@@ -2015,10 +2015,27 @@ consider_parallel_nestloop(PlannerInfo *root,
 {
 	JoinType	save_jointype = jointype;
 	ListCell   *lc1;
+	Path *matpath = NULL;
+	Path *inner_cheapest_total = innerrel->cheapest_total_path;
 
 	if (jointype == JOIN_UNIQUE_INNER)
 		jointype = JOIN_INNER;
 
+	/*
+	 * Consider materializing the cheapest inner path, unless we're
+	 * doing JOIN_UNIQUE_INNER because in this case we have to
+	 * unique-ify the inner path. The check that subpath is parallel safe
+	 * here is probably redundant because we will not enter this func if
+	 * joinrel is not parallel safe.
+	 */
+	if (save_jointype != JOIN_UNIQUE_INNER &&
+		enable_material &&
+		inner_cheapest_total != NULL &&
+		inner_cheapest_total->parallel_safe &&
+		!ExecMaterializesOutput(inner_cheapest_total->pathtype))
+		matpath = (Path *)
+			create_material_path(innerrel, inner_cheapest_total);
+
 	foreach(lc1, outerrel->partial_pathlist)
 	{
 		Path	   *outerpath = (Path *) lfirst(lc1);
@@ -2075,6 +2092,10 @@ consider_parallel_nestloop(PlannerInfo *root,
 				try_partial_nestloop_path(root, joinrel, outerpath, mpath,
 										  pathkeys, jointype, extra);
 		}
+		/* Also consider materialized form of the cheapest inner path */
+		if (matpath != NULL && matpath->parallel_safe)
+			try_partial_nestloop_path(root, joinrel, outerpath, matpath,
+									  pathkeys, jointype, extra);
 	}
 }
 
diff --git a/src/test/regress/expected/partition_join.out b/src/test/regress/expected/partition_join.out
index 6d07f86b9b..0f379012fc 100644
--- a/src/test/regress/expected/partition_join.out
+++ b/src/test/regress/expected/partition_join.out
@@ -510,25 +510,30 @@ EXPLAIN (COSTS OFF)
 SELECT * FROM prt1 t1 JOIN LATERAL
 			  (SELECT * FROM prt1 t2 TABLESAMPLE SYSTEM (t1.a) REPEATABLE(t1.b)) s
 			  ON t1.a = s.a;
-                         QUERY PLAN                          
--------------------------------------------------------------
- Append
-   ->  Nested Loop
-         ->  Seq Scan on prt1_p1 t1_1
-         ->  Sample Scan on prt1_p1 t2_1
-               Sampling: system (t1_1.a) REPEATABLE (t1_1.b)
-               Filter: (t1_1.a = a)
-   ->  Nested Loop
-         ->  Seq Scan on prt1_p2 t1_2
-         ->  Sample Scan on prt1_p2 t2_2
-               Sampling: system (t1_2.a) REPEATABLE (t1_2.b)
-               Filter: (t1_2.a = a)
-   ->  Nested Loop
-         ->  Seq Scan on prt1_p3 t1_3
-         ->  Sample Scan on prt1_p3 t2_3
-               Sampling: system (t1_3.a) REPEATABLE (t1_3.b)
-               Filter: (t1_3.a = a)
-(16 rows)
+                               QUERY PLAN                                
+-------------------------------------------------------------------------
+ Gather
+   Workers Planned: 2
+   ->  Parallel Append
+         ->  Nested Loop
+               ->  Parallel Seq Scan on prt1_p1 t1_1
+               ->  Materialize
+                     ->  Sample Scan on prt1_p1 t2_1
+                           Sampling: system (t1_1.a) REPEATABLE (t1_1.b)
+                           Filter: (t1_1.a = a)
+         ->  Nested Loop
+               ->  Parallel Seq Scan on prt1_p2 t1_2
+               ->  Materialize
+                     ->  Sample Scan on prt1_p2 t2_2
+                           Sampling: system (t1_2.a) REPEATABLE (t1_2.b)
+                           Filter: (t1_2.a = a)
+         ->  Nested Loop
+               ->  Parallel Seq Scan on prt1_p3 t1_3
+               ->  Materialize
+                     ->  Sample Scan on prt1_p3 t2_3
+                           Sampling: system (t1_3.a) REPEATABLE (t1_3.b)
+                           Filter: (t1_3.a = a)
+(21 rows)
 
 -- lateral reference in scan's restriction clauses
 EXPLAIN (COSTS OFF)
@@ -2041,35 +2046,42 @@ EXPLAIN (COSTS OFF)
 SELECT * FROM prt1_l t1 JOIN LATERAL
 			  (SELECT * FROM prt1_l t2 TABLESAMPLE SYSTEM (t1.a) REPEATABLE(t1.b)) s
 			  ON t1.a = s.a AND t1.b = s.b AND t1.c = s.c;
-                                       QUERY PLAN                                       
-----------------------------------------------------------------------------------------
- Append
-   ->  Nested Loop
-         ->  Seq Scan on prt1_l_p1 t1_1
-         ->  Sample Scan on prt1_l_p1 t2_1
-               Sampling: system (t1_1.a) REPEATABLE (t1_1.b)
-               Filter: ((t1_1.a = a) AND (t1_1.b = b) AND ((t1_1.c)::text = (c)::text))
-   ->  Nested Loop
-         ->  Seq Scan on prt1_l_p2_p1 t1_2
-         ->  Sample Scan on prt1_l_p2_p1 t2_2
-               Sampling: system (t1_2.a) REPEATABLE (t1_2.b)
-               Filter: ((t1_2.a = a) AND (t1_2.b = b) AND ((t1_2.c)::text = (c)::text))
-   ->  Nested Loop
-         ->  Seq Scan on prt1_l_p2_p2 t1_3
-         ->  Sample Scan on prt1_l_p2_p2 t2_3
-               Sampling: system (t1_3.a) REPEATABLE (t1_3.b)
-               Filter: ((t1_3.a = a) AND (t1_3.b = b) AND ((t1_3.c)::text = (c)::text))
-   ->  Nested Loop
-         ->  Seq Scan on prt1_l_p3_p1 t1_4
-         ->  Sample Scan on prt1_l_p3_p1 t2_4
-               Sampling: system (t1_4.a) REPEATABLE (t1_4.b)
-               Filter: ((t1_4.a = a) AND (t1_4.b = b) AND ((t1_4.c)::text = (c)::text))
-   ->  Nested Loop
-         ->  Seq Scan on prt1_l_p3_p2 t1_5
-         ->  Sample Scan on prt1_l_p3_p2 t2_5
-               Sampling: system (t1_5.a) REPEATABLE (t1_5.b)
-               Filter: ((t1_5.a = a) AND (t1_5.b = b) AND ((t1_5.c)::text = (c)::text))
-(26 rows)
+                                             QUERY PLAN                                             
+----------------------------------------------------------------------------------------------------
+ Gather
+   Workers Planned: 2
+   ->  Parallel Append
+         ->  Nested Loop
+               ->  Parallel Seq Scan on prt1_l_p1 t1_1
+               ->  Materialize
+                     ->  Sample Scan on prt1_l_p1 t2_1
+                           Sampling: system (t1_1.a) REPEATABLE (t1_1.b)
+                           Filter: ((t1_1.a = a) AND (t1_1.b = b) AND ((t1_1.c)::text = (c)::text))
+         ->  Nested Loop
+               ->  Parallel Seq Scan on prt1_l_p2_p2 t1_3
+               ->  Materialize
+                     ->  Sample Scan on prt1_l_p2_p2 t2_3
+                           Sampling: system (t1_3.a) REPEATABLE (t1_3.b)
+                           Filter: ((t1_3.a = a) AND (t1_3.b = b) AND ((t1_3.c)::text = (c)::text))
+         ->  Nested Loop
+               ->  Parallel Seq Scan on prt1_l_p2_p1 t1_2
+               ->  Materialize
+                     ->  Sample Scan on prt1_l_p2_p1 t2_2
+                           Sampling: system (t1_2.a) REPEATABLE (t1_2.b)
+                           Filter: ((t1_2.a = a) AND (t1_2.b = b) AND ((t1_2.c)::text = (c)::text))
+         ->  Nested Loop
+               ->  Parallel Seq Scan on prt1_l_p3_p1 t1_4
+               ->  Materialize
+                     ->  Sample Scan on prt1_l_p3_p1 t2_4
+                           Sampling: system (t1_4.a) REPEATABLE (t1_4.b)
+                           Filter: ((t1_4.a = a) AND (t1_4.b = b) AND ((t1_4.c)::text = (c)::text))
+         ->  Nested Loop
+               ->  Parallel Seq Scan on prt1_l_p3_p2 t1_5
+               ->  Materialize
+                     ->  Sample Scan on prt1_l_p3_p2 t2_5
+                           Sampling: system (t1_5.a) REPEATABLE (t1_5.b)
+                           Filter: ((t1_5.a = a) AND (t1_5.b = b) AND ((t1_5.c)::text = (c)::text))
+(33 rows)
 
 -- partitionwise join with lateral reference in scan's restriction clauses
 EXPLAIN (COSTS OFF)
diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out
index 87273fa635..69980bc14d 100644
--- a/src/test/regress/expected/select_parallel.out
+++ b/src/test/regress/expected/select_parallel.out
@@ -868,6 +868,46 @@ select * from
 (12 rows)
 
 reset enable_material;
+-- test materialized form of the cheapest inner path
+set min_parallel_table_scan_size = '512kB';
+explain(costs off)
+select count(*) from tenk1, int4_tbl where tenk1.two < int4_tbl.f1;
+                         QUERY PLAN                         
+------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 4
+         ->  Partial Aggregate
+               ->  Nested Loop
+                     Join Filter: (tenk1.two < int4_tbl.f1)
+                     ->  Parallel Seq Scan on tenk1
+                     ->  Materialize
+                           ->  Seq Scan on int4_tbl
+(9 rows)
+
+select count(*) from tenk1, int4_tbl where tenk1.two < int4_tbl.f1;
+ count 
+-------
+ 20000
+(1 row)
+
+-- don't consider parallel nestloop if inner path is not parallel-safe
+set enable_memoize = off;
+explain(costs off)
+select * from tenk1,
+	lateral (select * from int4_tbl where tenk1.two < int4_tbl.f1 for share);
+                  QUERY PLAN                  
+----------------------------------------------
+ Nested Loop
+   ->  Seq Scan on tenk1
+   ->  Subquery Scan on unnamed_subquery
+         ->  LockRows
+               ->  Seq Scan on int4_tbl
+                     Filter: (tenk1.two < f1)
+(6 rows)
+
+set min_parallel_table_scan_size = 0;
+reset enable_memoize;
 reset enable_hashagg;
 -- check parallelized int8 aggregate (bug #14897)
 explain (costs off)
diff --git a/src/test/regress/sql/select_parallel.sql b/src/test/regress/sql/select_parallel.sql
index 20376c03fa..2d64846398 100644
--- a/src/test/regress/sql/select_parallel.sql
+++ b/src/test/regress/sql/select_parallel.sql
@@ -320,6 +320,22 @@ select * from
 
 reset enable_material;
 
+-- test materialized form of the cheapest inner path
+set min_parallel_table_scan_size = '512kB';
+
+explain(costs off)
+select count(*) from tenk1, int4_tbl where tenk1.two < int4_tbl.f1;
+
+select count(*) from tenk1, int4_tbl where tenk1.two < int4_tbl.f1;
+
+-- don't consider parallel nestloop if inner path is not parallel-safe
+set enable_memoize = off;
+explain(costs off)
+select * from tenk1,
+	lateral (select * from int4_tbl where tenk1.two < int4_tbl.f1 for share);
+
+set min_parallel_table_scan_size = 0;
+reset enable_memoize;
 reset enable_hashagg;
 
 -- check parallelized int8 aggregate (bug #14897)
-- 
2.34.1

