Hi.

Matheus Alcantara писал(а) 2025-11-18 00:09:
On Sat Nov 15, 2025 at 7:57 AM -03, Alexander Pyhalov wrote:
Here are some comments on my first look at the patches. I still don't
have too much experience with the executor code but I hope that I can
help with something.

v5-0001-mark_async_capable-subpath-should-match-subplan.patch

I don't have to much comments on this, perhaps we could have a commit
message explaining the reason behind the change.

I've expanded commit message. The issue is that mark_async_capable()
relies
on the fact that plan node type corresponds to path type. This is not
true when
(Merge)Append decides to insert Sort node.

Your explanation about why this change is needed that you've include on
your first email sounds more clear IMHO. I would suggest the following
for a commit message:
mark_async_capable() believes that path corresponds to plan. This is
    not true when create_[merge_]append_plan() inserts sort node. In
    this case mark_async_capable() can treat Sort plan node as some
    other and crash. Fix this by handling the Sort node separately.

    This is needed to make MergeAppend node async-capable that it will
    be implemented in a next commit.

What do you think?


Seems to be OK.

I was reading the patch changes again and I have a minor point:

SubqueryScan *scan_plan = (SubqueryScan *) plan;

                                /*
-                                * If the generated plan node includes
a gating Result node,
-                                * we can't execute it asynchronously.
+                                * If the generated plan node includes
a gating Result node or
+                                * a Sort node, we can't execute it
asynchronously.
                                 */
-                               if (IsA(plan, Result))
+ if (IsA(plan, Result) || IsA(plan, Sort))

Shouldn't we cast the plan to SubqueryScan* after the IsA(...) check? I
know this code has been before your changes but type casting before a
IsA() check sounds a bit strange to me. Also perhaps we could add an
Assert(IsA(plan, SubqueryScan)) after the IsA(...) check and before the
type casting just for sanity?

Yes, checking for node not to be A and then using it as B seems to be strange. But casting to another type and checking if node is of a particular type before using seems to be rather common. It doesn't do any harm if we don't actually refer to SubqueryScan fields.

Updated the first patch.


--
Best regards,
Alexander Pyhalov,
Postgres Professional
From 10d872ce10bc8c7f6a430fe70367714b526fab4d Mon Sep 17 00:00:00 2001
From: Alexander Pyhalov <[email protected]>
Date: Sat, 15 Nov 2025 10:16:25 +0300
Subject: [PATCH 1/2] mark_async_capable(): subpath should match subplan

mark_async_capable() believes that path corresponds to plan. This is
not true when create_[merge_]append_plan() inserts sort node. In
this case mark_async_capable() can treat Sort plan node as some
other and crash. Fix this by handling the Sort node separately.

This is needed to make MergeAppend node async-capable that will
be implemented in a next commit.
---
 src/backend/optimizer/plan/createplan.c | 18 ++++++++++--------
 1 file changed, 10 insertions(+), 8 deletions(-)

diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 8af091ba647..5cd7fa7b897 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -1139,10 +1139,12 @@ mark_async_capable_plan(Plan *plan, Path *path)
 				SubqueryScan *scan_plan = (SubqueryScan *) plan;
 
 				/*
-				 * If the generated plan node includes a gating Result node,
-				 * we can't execute it asynchronously.
+				 * Check that plan is really a SubqueryScan before using it.
+				 * It can be not true, if the generated plan node includes a
+				 * gating Result node or a Sort node. In such case we can't
+				 * execute it asynchronously.
 				 */
-				if (IsA(plan, Result))
+				if (!IsA(scan_plan, SubqueryScan))
 					return false;
 
 				/*
@@ -1160,10 +1162,10 @@ mark_async_capable_plan(Plan *plan, Path *path)
 				FdwRoutine *fdwroutine = path->parent->fdwroutine;
 
 				/*
-				 * If the generated plan node includes a gating Result node,
-				 * we can't execute it asynchronously.
+				 * If the generated plan node includes a gating Result node or
+				 * a Sort node, we can't execute it asynchronously.
 				 */
-				if (IsA(plan, Result))
+				if (IsA(plan, Result) || IsA(plan, Sort))
 					return false;
 
 				Assert(fdwroutine != NULL);
@@ -1176,9 +1178,9 @@ mark_async_capable_plan(Plan *plan, Path *path)
 
 			/*
 			 * If the generated plan node includes a Result node for the
-			 * projection, we can't execute it asynchronously.
+			 * projection or a Sort node, we can't execute it asynchronously.
 			 */
-			if (IsA(plan, Result))
+			if (IsA(plan, Result) || IsA(plan, Sort))
 				return false;
 
 			/*
-- 
2.43.0

From 76a212d1ba66cb727a95a4f6cba28786795b6d11 Mon Sep 17 00:00:00 2001
From: Alexander Pyhalov <[email protected]>
Date: Sat, 15 Nov 2025 10:23:47 +0300
Subject: [PATCH 2/2] MergeAppend should support Async Foreign Scan subplans

---
 .../postgres_fdw/expected/postgres_fdw.out    | 288 +++++++++++
 contrib/postgres_fdw/postgres_fdw.c           |  10 +-
 contrib/postgres_fdw/sql/postgres_fdw.sql     |  87 ++++
 doc/src/sgml/config.sgml                      |  14 +
 src/backend/executor/execAsync.c              |   4 +
 src/backend/executor/nodeAppend.c             |  21 +-
 src/backend/executor/nodeMergeAppend.c        | 458 +++++++++++++++++-
 src/backend/optimizer/path/costsize.c         |   1 +
 src/backend/optimizer/plan/createplan.c       |   9 +
 src/backend/utils/misc/guc_parameters.dat     |   8 +
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/include/executor/nodeMergeAppend.h        |   1 +
 src/include/nodes/execnodes.h                 |  57 +++
 src/include/optimizer/cost.h                  |   1 +
 src/test/regress/expected/sysviews.out        |   3 +-
 15 files changed, 938 insertions(+), 25 deletions(-)

diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index cd28126049d..5aa563c95ed 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -11556,6 +11556,46 @@ SELECT * FROM result_tbl ORDER BY a;
 (2 rows)
 
 DELETE FROM result_tbl;
+-- Test Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE b % 100 = 0 ORDER BY b, a;
+                                                          QUERY PLAN                                                          
+------------------------------------------------------------------------------------------------------------------------------
+ Merge Append
+   Sort Key: async_pt.b, async_pt.a
+   ->  Async Foreign Scan on public.async_p1 async_pt_1
+         Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+         Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE (((b % 100) = 0)) ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+   ->  Async Foreign Scan on public.async_p2 async_pt_2
+         Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+         Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE (((b % 100) = 0)) ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+(8 rows)
+
+SELECT * FROM async_pt WHERE b % 100 = 0 ORDER BY b, a;
+  a   |  b  |  c   
+------+-----+------
+ 1000 |   0 | 0000
+ 2000 |   0 | 0000
+ 1100 | 100 | 0100
+ 2100 | 100 | 0100
+ 1200 | 200 | 0200
+ 2200 | 200 | 0200
+ 1300 | 300 | 0300
+ 2300 | 300 | 0300
+ 1400 | 400 | 0400
+ 2400 | 400 | 0400
+ 1500 | 500 | 0500
+ 2500 | 500 | 0500
+ 1600 | 600 | 0600
+ 2600 | 600 | 0600
+ 1700 | 700 | 0700
+ 2700 | 700 | 0700
+ 1800 | 800 | 0800
+ 2800 | 800 | 0800
+ 1900 | 900 | 0900
+ 2900 | 900 | 0900
+(20 rows)
+
 -- Test error handling, if accessing one of the foreign partitions errors out
 CREATE FOREIGN TABLE async_p_broken PARTITION OF async_pt FOR VALUES FROM (10000) TO (10001)
   SERVER loopback OPTIONS (table_name 'non_existent_table');
@@ -11604,6 +11644,76 @@ COPY async_pt TO stdout; --error
 ERROR:  cannot copy from foreign table "async_p1"
 DETAIL:  Partition "async_p1" is a foreign table in partitioned table "async_pt"
 HINT:  Try the COPY (SELECT ...) TO variant.
+-- Test Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+                                              QUERY PLAN                                              
+------------------------------------------------------------------------------------------------------
+ Merge Append
+   Sort Key: async_pt.b, async_pt.a
+   ->  Async Foreign Scan on public.async_p1 async_pt_1
+         Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+         Filter: (async_pt_1.b === 505)
+         Remote SQL: SELECT a, b, c FROM public.base_tbl1 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+   ->  Async Foreign Scan on public.async_p2 async_pt_2
+         Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+         Filter: (async_pt_2.b === 505)
+         Remote SQL: SELECT a, b, c FROM public.base_tbl2 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+   ->  Async Foreign Scan on public.async_p3 async_pt_3
+         Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
+         Filter: (async_pt_3.b === 505)
+         Remote SQL: SELECT a, b, c FROM public.base_tbl3 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+(14 rows)
+
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+  a   |  b  |  c   
+------+-----+------
+ 1505 | 505 | 0505
+ 2505 | 505 | 0505
+ 3505 | 505 | 0505
+(3 rows)
+
+-- Test async Merge Append rescan
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT
+	ARRAY(SELECT f.i FROM (SELECT b + g.i FROM async_pt WHERE a > g.i ORDER BY b) f(i) ORDER BY f.i LIMIT 10)
+FROM generate_series(1, 3) g(i);
+                                                            QUERY PLAN                                                            
+----------------------------------------------------------------------------------------------------------------------------------
+ Function Scan on pg_catalog.generate_series g
+   Output: ARRAY(SubPlan array_1)
+   Function Call: generate_series(1, 3)
+   SubPlan array_1
+     ->  Limit
+           Output: f.i
+           ->  Sort
+                 Output: f.i
+                 Sort Key: f.i
+                 ->  Subquery Scan on f
+                       Output: f.i
+                       ->  Merge Append
+                             Sort Key: async_pt.b
+                             ->  Async Foreign Scan on public.async_p1 async_pt_1
+                                   Output: (async_pt_1.b + g.i), async_pt_1.b
+                                   Remote SQL: SELECT b FROM public.base_tbl1 WHERE ((a > $1::integer)) ORDER BY b ASC NULLS LAST
+                             ->  Async Foreign Scan on public.async_p2 async_pt_2
+                                   Output: (async_pt_2.b + g.i), async_pt_2.b
+                                   Remote SQL: SELECT b FROM public.base_tbl2 WHERE ((a > $1::integer)) ORDER BY b ASC NULLS LAST
+                             ->  Async Foreign Scan on public.async_p3 async_pt_3
+                                   Output: (async_pt_3.b + g.i), async_pt_3.b
+                                   Remote SQL: SELECT b FROM public.base_tbl3 WHERE ((a > $1::integer)) ORDER BY b ASC NULLS LAST
+(22 rows)
+
+SELECT
+	ARRAY(SELECT f.i FROM (SELECT b + g.i FROM async_pt WHERE a > g.i ORDER BY b) f(i) ORDER BY f.i LIMIT 10)
+FROM generate_series(1, 3) g(i);
+           array           
+---------------------------
+ {1,1,1,6,6,6,11,11,11,16}
+ {2,2,2,7,7,7,12,12,12,17}
+ {3,3,3,8,8,8,13,13,13,18}
+(3 rows)
+
 DROP FOREIGN TABLE async_p3;
 DROP TABLE base_tbl3;
 -- Check case where the partitioned table has local/remote partitions
@@ -11639,6 +11749,37 @@ SELECT * FROM result_tbl ORDER BY a;
 (3 rows)
 
 DELETE FROM result_tbl;
+-- Test Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+                                              QUERY PLAN                                              
+------------------------------------------------------------------------------------------------------
+ Merge Append
+   Sort Key: async_pt.b, async_pt.a
+   ->  Async Foreign Scan on public.async_p1 async_pt_1
+         Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+         Filter: (async_pt_1.b === 505)
+         Remote SQL: SELECT a, b, c FROM public.base_tbl1 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+   ->  Async Foreign Scan on public.async_p2 async_pt_2
+         Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+         Filter: (async_pt_2.b === 505)
+         Remote SQL: SELECT a, b, c FROM public.base_tbl2 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+   ->  Sort
+         Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
+         Sort Key: async_pt_3.b, async_pt_3.a
+         ->  Seq Scan on public.async_p3 async_pt_3
+               Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
+               Filter: (async_pt_3.b === 505)
+(16 rows)
+
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+  a   |  b  |  c   
+------+-----+------
+ 1505 | 505 | 0505
+ 2505 | 505 | 0505
+ 3505 | 505 | 0505
+(3 rows)
+
 -- partitionwise joins
 SET enable_partitionwise_join TO true;
 CREATE TABLE join_tbl (a1 int, b1 int, c1 text, a2 int, b2 int, c2 text);
@@ -12421,6 +12562,153 @@ SELECT a FROM base_tbl WHERE (a, random() > 0) IN (SELECT a, random() > 0 FROM f
 DROP FOREIGN TABLE foreign_tbl CASCADE;
 NOTICE:  drop cascades to foreign table foreign_tbl2
 DROP TABLE base_tbl;
+-- Test async Merge Append
+CREATE TABLE distr1 (i int, j int, k text) PARTITION BY HASH (i);
+CREATE TABLE base1 (i int, j int, k text);
+CREATE TABLE base2 (i int, j int, k text);
+CREATE FOREIGN TABLE distr1_p1 PARTITION OF distr1 FOR VALUES WITH (MODULUS 2, REMAINDER 0)
+SERVER loopback OPTIONS (table_name 'base1');
+CREATE FOREIGN TABLE distr1_p2 PARTITION OF distr1 FOR VALUES WITH (MODULUS 2, REMAINDER 1)
+SERVER loopback OPTIONS (table_name 'base2');
+CREATE TABLE distr2 (i int, j int, k text) PARTITION BY HASH (i);
+CREATE TABLE base3 (i int, j int, k text);
+CREATE TABLE base4 (i int, j int, k text);
+CREATE FOREIGN TABLE distr2_p1 PARTITION OF distr2 FOR VALUES WITH (MODULUS 2, REMAINDER 0)
+SERVER loopback OPTIONS (table_name 'base3');
+CREATE FOREIGN TABLE distr2_p2 PARTITION OF distr2 FOR VALUES WITH (MODULUS 2, REMAINDER 1)
+SERVER loopback OPTIONS (table_name 'base4');
+INSERT INTO distr1
+SELECT i, i*10, 'data_' || i FROM generate_series(1, 1000) i;
+INSERT INTO distr2
+SELECT i, i*10, 'data_' || i FROM generate_series(1, 100) i;
+ANALYZE distr1_p1;
+ANALYZE distr1_p2;
+ANALYZE distr2_p1;
+ANALYZE distr2_p2;
+SET enable_partitionwise_join TO ON;
+-- Test joins with async Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM distr1, distr2 WHERE distr1.i=distr2.i AND distr2.j > 90 and distr2.k like 'data%'
+ORDER BY distr2.i LIMIT 10;
+                                                                                                    QUERY PLAN                                                                                                     
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Limit
+   Output: distr1.i, distr1.j, distr1.k, distr2.i, distr2.j, distr2.k
+   ->  Merge Append
+         Sort Key: distr1.i
+         ->  Async Foreign Scan
+               Output: distr1_1.i, distr1_1.j, distr1_1.k, distr2_1.i, distr2_1.j, distr2_1.k
+               Relations: (public.distr1_p1 distr1_1) INNER JOIN (public.distr2_p1 distr2_1)
+               Remote SQL: SELECT r3.i, r3.j, r3.k, r5.i, r5.j, r5.k FROM (public.base1 r3 INNER JOIN public.base3 r5 ON (((r3.i = r5.i)) AND ((r5.j > 90)) AND ((r5.k ~~ 'data%')))) ORDER BY r3.i ASC NULLS LAST
+         ->  Async Foreign Scan
+               Output: distr1_2.i, distr1_2.j, distr1_2.k, distr2_2.i, distr2_2.j, distr2_2.k
+               Relations: (public.distr1_p2 distr1_2) INNER JOIN (public.distr2_p2 distr2_2)
+               Remote SQL: SELECT r4.i, r4.j, r4.k, r6.i, r6.j, r6.k FROM (public.base2 r4 INNER JOIN public.base4 r6 ON (((r4.i = r6.i)) AND ((r6.j > 90)) AND ((r6.k ~~ 'data%')))) ORDER BY r4.i ASC NULLS LAST
+(12 rows)
+
+SELECT * FROM distr1, distr2 WHERE distr1.i=distr2.i AND distr2.j > 90 and distr2.k like 'data%'
+ORDER BY distr2.i LIMIT 10;
+ i  |  j  |    k    | i  |  j  |    k    
+----+-----+---------+----+-----+---------
+ 10 | 100 | data_10 | 10 | 100 | data_10
+ 11 | 110 | data_11 | 11 | 110 | data_11
+ 12 | 120 | data_12 | 12 | 120 | data_12
+ 13 | 130 | data_13 | 13 | 130 | data_13
+ 14 | 140 | data_14 | 14 | 140 | data_14
+ 15 | 150 | data_15 | 15 | 150 | data_15
+ 16 | 160 | data_16 | 16 | 160 | data_16
+ 17 | 170 | data_17 | 17 | 170 | data_17
+ 18 | 180 | data_18 | 18 | 180 | data_18
+ 19 | 190 | data_19 | 19 | 190 | data_19
+(10 rows)
+
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM distr1 LEFT JOIN distr2 ON distr1.i=distr2.i AND distr2.k like 'data%' WHERE  distr1.i > 90
+ORDER BY distr1.i LIMIT 20;
+                                                                                                     QUERY PLAN                                                                                                     
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Limit
+   Output: distr1.i, distr1.j, distr1.k, distr2.i, distr2.j, distr2.k
+   ->  Merge Append
+         Sort Key: distr1.i
+         ->  Async Foreign Scan
+               Output: distr1_1.i, distr1_1.j, distr1_1.k, distr2_1.i, distr2_1.j, distr2_1.k
+               Relations: (public.distr1_p1 distr1_1) LEFT JOIN (public.distr2_p1 distr2_1)
+               Remote SQL: SELECT r4.i, r4.j, r4.k, r6.i, r6.j, r6.k FROM (public.base1 r4 LEFT JOIN public.base3 r6 ON (((r4.i = r6.i)) AND ((r6.k ~~ 'data%')))) WHERE ((r4.i > 90)) ORDER BY r4.i ASC NULLS LAST
+         ->  Async Foreign Scan
+               Output: distr1_2.i, distr1_2.j, distr1_2.k, distr2_2.i, distr2_2.j, distr2_2.k
+               Relations: (public.distr1_p2 distr1_2) LEFT JOIN (public.distr2_p2 distr2_2)
+               Remote SQL: SELECT r5.i, r5.j, r5.k, r7.i, r7.j, r7.k FROM (public.base2 r5 LEFT JOIN public.base4 r7 ON (((r5.i = r7.i)) AND ((r7.k ~~ 'data%')))) WHERE ((r5.i > 90)) ORDER BY r5.i ASC NULLS LAST
+(12 rows)
+
+SELECT * FROM distr1 LEFT JOIN distr2 ON  distr1.i=distr2.i AND distr2.k like 'data%' WHERE distr1.i > 90
+ORDER BY distr1.i LIMIT 20;
+  i  |  j   |    k     |  i  |  j   |    k     
+-----+------+----------+-----+------+----------
+  91 |  910 | data_91  |  91 |  910 | data_91
+  92 |  920 | data_92  |  92 |  920 | data_92
+  93 |  930 | data_93  |  93 |  930 | data_93
+  94 |  940 | data_94  |  94 |  940 | data_94
+  95 |  950 | data_95  |  95 |  950 | data_95
+  96 |  960 | data_96  |  96 |  960 | data_96
+  97 |  970 | data_97  |  97 |  970 | data_97
+  98 |  980 | data_98  |  98 |  980 | data_98
+  99 |  990 | data_99  |  99 |  990 | data_99
+ 100 | 1000 | data_100 | 100 | 1000 | data_100
+ 101 | 1010 | data_101 |     |      | 
+ 102 | 1020 | data_102 |     |      | 
+ 103 | 1030 | data_103 |     |      | 
+ 104 | 1040 | data_104 |     |      | 
+ 105 | 1050 | data_105 |     |      | 
+ 106 | 1060 | data_106 |     |      | 
+ 107 | 1070 | data_107 |     |      | 
+ 108 | 1080 | data_108 |     |      | 
+ 109 | 1090 | data_109 |     |      | 
+ 110 | 1100 | data_110 |     |      | 
+(20 rows)
+
+-- Test pruning with async Merge Append
+DELETE FROM distr2;
+INSERT INTO distr2
+SELECT i%10, i*10, 'data_' || i FROM generate_series(1, 1000) i;
+DEALLOCATE ALL;
+SET plan_cache_mode TO force_generic_plan;
+PREPARE async_pt_query (int, int) AS
+  SELECT * FROM distr2 WHERE i = ANY(ARRAY[$1, $2])
+  ORDER BY i,j
+  LIMIT 10;
+EXPLAIN (VERBOSE, COSTS OFF)
+	EXECUTE async_pt_query(1, 1);
+                                                                         QUERY PLAN                                                                         
+------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Limit
+   Output: distr2.i, distr2.j, distr2.k
+   ->  Merge Append
+         Sort Key: distr2.i, distr2.j
+         Subplans Removed: 1
+         ->  Async Foreign Scan on public.distr2_p1 distr2_1
+               Output: distr2_1.i, distr2_1.j, distr2_1.k
+               Remote SQL: SELECT i, j, k FROM public.base3 WHERE ((i = ANY (ARRAY[$1::integer, $2::integer]))) ORDER BY i ASC NULLS LAST, j ASC NULLS LAST
+(8 rows)
+
+EXECUTE async_pt_query(1, 1);
+ i |  j  |    k    
+---+-----+---------
+ 1 |  10 | data_1
+ 1 | 110 | data_11
+ 1 | 210 | data_21
+ 1 | 310 | data_31
+ 1 | 410 | data_41
+ 1 | 510 | data_51
+ 1 | 610 | data_61
+ 1 | 710 | data_71
+ 1 | 810 | data_81
+ 1 | 910 | data_91
+(10 rows)
+
+RESET plan_cache_mode;
+RESET enable_partitionwise_join;
+DROP TABLE distr1, distr2, base1, base2, base3, base4;
 ALTER SERVER loopback OPTIONS (DROP async_capable);
 ALTER SERVER loopback2 OPTIONS (DROP async_capable);
 -- ===================================================================
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 06b52c65300..2105c9c90b9 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -7213,12 +7213,16 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq)
 	ForeignScanState *node = (ForeignScanState *) areq->requestee;
 	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
 	AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq;
-	AppendState *requestor = (AppendState *) areq->requestor;
-	WaitEventSet *set = requestor->as_eventset;
+	PlanState  *requestor = areq->requestor;
+	WaitEventSet *set;
+	Bitmapset  *needrequest;
 
 	/* This should not be called unless callback_pending */
 	Assert(areq->callback_pending);
 
+	set = GetAppendEventSet(requestor);
+	needrequest = GetNeedRequest(requestor);
+
 	/*
 	 * If process_pending_request() has been invoked on the given request
 	 * before we get here, we might have some tuples already; in which case
@@ -7256,7 +7260,7 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq)
 		 * below, because we might otherwise end up with no configured events
 		 * other than the postmaster death event.
 		 */
-		if (!bms_is_empty(requestor->as_needrequest))
+		if (!bms_is_empty(needrequest))
 			return;
 		if (GetNumRegisteredWaitEvents(set) > 1)
 			return;
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 9a8f9e28135..aa388cb027f 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -3921,6 +3921,11 @@ INSERT INTO result_tbl SELECT a, b, 'AAA' || c FROM async_pt WHERE b === 505;
 SELECT * FROM result_tbl ORDER BY a;
 DELETE FROM result_tbl;
 
+-- Test Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE b % 100 = 0 ORDER BY b, a;
+SELECT * FROM async_pt WHERE b % 100 = 0 ORDER BY b, a;
+
 -- Test error handling, if accessing one of the foreign partitions errors out
 CREATE FOREIGN TABLE async_p_broken PARTITION OF async_pt FOR VALUES FROM (10000) TO (10001)
   SERVER loopback OPTIONS (table_name 'non_existent_table');
@@ -3944,6 +3949,20 @@ DELETE FROM result_tbl;
 -- Test COPY TO when foreign table is partition
 COPY async_pt TO stdout; --error
 
+-- Test Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+
+-- Test async Merge Append rescan
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT
+	ARRAY(SELECT f.i FROM (SELECT b + g.i FROM async_pt WHERE a > g.i ORDER BY b) f(i) ORDER BY f.i LIMIT 10)
+FROM generate_series(1, 3) g(i);
+SELECT
+	ARRAY(SELECT f.i FROM (SELECT b + g.i FROM async_pt WHERE a > g.i ORDER BY b) f(i) ORDER BY f.i LIMIT 10)
+FROM generate_series(1, 3) g(i);
+
 DROP FOREIGN TABLE async_p3;
 DROP TABLE base_tbl3;
 
@@ -3959,6 +3978,11 @@ INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
 SELECT * FROM result_tbl ORDER BY a;
 DELETE FROM result_tbl;
 
+-- Test Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+
 -- partitionwise joins
 SET enable_partitionwise_join TO true;
 
@@ -4197,6 +4221,69 @@ SELECT a FROM base_tbl WHERE (a, random() > 0) IN (SELECT a, random() > 0 FROM f
 DROP FOREIGN TABLE foreign_tbl CASCADE;
 DROP TABLE base_tbl;
 
+-- Test async Merge Append
+CREATE TABLE distr1 (i int, j int, k text) PARTITION BY HASH (i);
+CREATE TABLE base1 (i int, j int, k text);
+CREATE TABLE base2 (i int, j int, k text);
+CREATE FOREIGN TABLE distr1_p1 PARTITION OF distr1 FOR VALUES WITH (MODULUS 2, REMAINDER 0)
+SERVER loopback OPTIONS (table_name 'base1');
+CREATE FOREIGN TABLE distr1_p2 PARTITION OF distr1 FOR VALUES WITH (MODULUS 2, REMAINDER 1)
+SERVER loopback OPTIONS (table_name 'base2');
+
+CREATE TABLE distr2 (i int, j int, k text) PARTITION BY HASH (i);
+CREATE TABLE base3 (i int, j int, k text);
+CREATE TABLE base4 (i int, j int, k text);
+CREATE FOREIGN TABLE distr2_p1 PARTITION OF distr2 FOR VALUES WITH (MODULUS 2, REMAINDER 0)
+SERVER loopback OPTIONS (table_name 'base3');
+CREATE FOREIGN TABLE distr2_p2 PARTITION OF distr2 FOR VALUES WITH (MODULUS 2, REMAINDER 1)
+SERVER loopback OPTIONS (table_name 'base4');
+
+INSERT INTO distr1
+SELECT i, i*10, 'data_' || i FROM generate_series(1, 1000) i;
+
+INSERT INTO distr2
+SELECT i, i*10, 'data_' || i FROM generate_series(1, 100) i;
+
+ANALYZE distr1_p1;
+ANALYZE distr1_p2;
+ANALYZE distr2_p1;
+ANALYZE distr2_p2;
+
+SET enable_partitionwise_join TO ON;
+
+-- Test joins with async Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM distr1, distr2 WHERE distr1.i=distr2.i AND distr2.j > 90 and distr2.k like 'data%'
+ORDER BY distr2.i LIMIT 10;
+SELECT * FROM distr1, distr2 WHERE distr1.i=distr2.i AND distr2.j > 90 and distr2.k like 'data%'
+ORDER BY distr2.i LIMIT 10;
+
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM distr1 LEFT JOIN distr2 ON distr1.i=distr2.i AND distr2.k like 'data%' WHERE  distr1.i > 90
+ORDER BY distr1.i LIMIT 20;
+SELECT * FROM distr1 LEFT JOIN distr2 ON  distr1.i=distr2.i AND distr2.k like 'data%' WHERE distr1.i > 90
+ORDER BY distr1.i LIMIT 20;
+
+-- Test pruning with async Merge Append
+DELETE FROM distr2;
+INSERT INTO distr2
+SELECT i%10, i*10, 'data_' || i FROM generate_series(1, 1000) i;
+
+DEALLOCATE ALL;
+SET plan_cache_mode TO force_generic_plan;
+PREPARE async_pt_query (int, int) AS
+  SELECT * FROM distr2 WHERE i = ANY(ARRAY[$1, $2])
+  ORDER BY i,j
+  LIMIT 10;
+EXPLAIN (VERBOSE, COSTS OFF)
+	EXECUTE async_pt_query(1, 1);
+EXECUTE async_pt_query(1, 1);
+RESET plan_cache_mode;
+
+RESET enable_partitionwise_join;
+
+DROP TABLE distr1, distr2, base1, base2, base3, base4;
+
 ALTER SERVER loopback OPTIONS (DROP async_capable);
 ALTER SERVER loopback2 OPTIONS (DROP async_capable);
 
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index df1c3eaaa58..1d46de13918 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -5455,6 +5455,20 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-enable-async-merge-append" xreflabel="enable_async_merge_append">
+      <term><varname>enable_async_merge_append</varname> (<type>boolean</type>)
+      <indexterm>
+       <primary><varname>enable_async_merge_append</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Enables or disables the query planner's use of async-aware
+        merge append plan types. The default is <literal>on</literal>.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-enable-bitmapscan" xreflabel="enable_bitmapscan">
       <term><varname>enable_bitmapscan</varname> (<type>boolean</type>)
       <indexterm>
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c
index 5d3cabe73e3..6dc19ebc374 100644
--- a/src/backend/executor/execAsync.c
+++ b/src/backend/executor/execAsync.c
@@ -17,6 +17,7 @@
 #include "executor/execAsync.h"
 #include "executor/executor.h"
 #include "executor/nodeAppend.h"
+#include "executor/nodeMergeAppend.h"
 #include "executor/nodeForeignscan.h"
 
 /*
@@ -121,6 +122,9 @@ ExecAsyncResponse(AsyncRequest *areq)
 		case T_AppendState:
 			ExecAsyncAppendResponse(areq);
 			break;
+		case T_MergeAppendState:
+			ExecAsyncMergeAppendResponse(areq);
+			break;
 		default:
 			/* If the node doesn't support async, caller messed up. */
 			elog(ERROR, "unrecognized node type: %d",
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index a11b36c7176..c89e2d2787f 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -1187,10 +1187,7 @@ ExecAsyncAppendResponse(AsyncRequest *areq)
 static void
 classify_matching_subplans(AppendState *node)
 {
-	Bitmapset  *valid_asyncplans;
-
 	Assert(node->as_valid_subplans_identified);
-	Assert(node->as_valid_asyncplans == NULL);
 
 	/* Nothing to do if there are no valid subplans. */
 	if (bms_is_empty(node->as_valid_subplans))
@@ -1200,21 +1197,7 @@ classify_matching_subplans(AppendState *node)
 		return;
 	}
 
-	/* Nothing to do if there are no valid async subplans. */
-	if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
-	{
+	/* No valid async subplans identified. */
+	if (!classify_matching_subplans_common(&node->as_valid_subplans, node->as_asyncplans, &node->as_valid_asyncplans))
 		node->as_nasyncremain = 0;
-		return;
-	}
-
-	/* Get valid async subplans. */
-	valid_asyncplans = bms_intersect(node->as_asyncplans,
-									 node->as_valid_subplans);
-
-	/* Adjust the valid subplans to contain sync subplans only. */
-	node->as_valid_subplans = bms_del_members(node->as_valid_subplans,
-											  valid_asyncplans);
-
-	/* Save valid async subplans. */
-	node->as_valid_asyncplans = valid_asyncplans;
 }
diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
index 405e8f94285..7db41fbf40f 100644
--- a/src/backend/executor/nodeMergeAppend.c
+++ b/src/backend/executor/nodeMergeAppend.c
@@ -39,10 +39,15 @@
 #include "postgres.h"
 
 #include "executor/executor.h"
+#include "executor/execAsync.h"
 #include "executor/execPartition.h"
 #include "executor/nodeMergeAppend.h"
 #include "lib/binaryheap.h"
 #include "miscadmin.h"
+#include "storage/latch.h"
+#include "utils/wait_event.h"
+
+#define EVENT_BUFFER_SIZE                     16
 
 /*
  * We have one slot for each item in the heap array.  We use SlotNumber
@@ -54,6 +59,12 @@ typedef int32 SlotNumber;
 static TupleTableSlot *ExecMergeAppend(PlanState *pstate);
 static int	heap_compare_slots(Datum a, Datum b, void *arg);
 
+static void classify_matching_subplans(MergeAppendState *node);
+static void ExecMergeAppendAsyncBegin(MergeAppendState *node);
+static void ExecMergeAppendAsyncGetNext(MergeAppendState *node, int mplan);
+static bool ExecMergeAppendAsyncRequest(MergeAppendState *node, int mplan);
+static void ExecMergeAppendAsyncEventWait(MergeAppendState *node);
+
 
 /* ----------------------------------------------------------------
  *		ExecInitMergeAppend
@@ -71,6 +82,8 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
 	int			nplans;
 	int			i,
 				j;
+	Bitmapset  *asyncplans;
+	int			nasyncplans;
 
 	/* check for unsupported flags */
 	Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
@@ -106,7 +119,10 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
 		 * later calls to ExecFindMatchingSubPlans.
 		 */
 		if (!prunestate->do_exec_prune && nplans > 0)
+		{
 			mergestate->ms_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
+			mergestate->ms_valid_subplans_identified = true;
+		}
 	}
 	else
 	{
@@ -119,6 +135,7 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
 		Assert(nplans > 0);
 		mergestate->ms_valid_subplans = validsubplans =
 			bms_add_range(NULL, 0, nplans - 1);
+		mergestate->ms_valid_subplans_identified = true;
 		mergestate->ms_prune_state = NULL;
 	}
 
@@ -135,11 +152,25 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
 	 * the results into the mergeplanstates array.
 	 */
 	j = 0;
+	asyncplans = NULL;
+	nasyncplans = 0;
+
 	i = -1;
 	while ((i = bms_next_member(validsubplans, i)) >= 0)
 	{
 		Plan	   *initNode = (Plan *) list_nth(node->mergeplans, i);
 
+		/*
+		 * Record async subplans.  When executing EvalPlanQual, we treat them
+		 * as sync ones; don't do this when initializing an EvalPlanQual plan
+		 * tree.
+		 */
+		if (initNode->async_capable && estate->es_epq_active == NULL)
+		{
+			asyncplans = bms_add_member(asyncplans, j);
+			nasyncplans++;
+		}
+
 		mergeplanstates[j++] = ExecInitNode(initNode, estate, eflags);
 	}
 
@@ -170,6 +201,45 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
 	 */
 	mergestate->ps.ps_ProjInfo = NULL;
 
+	/* Initialize async state */
+	mergestate->ms_asyncplans = asyncplans;
+	mergestate->ms_nasyncplans = nasyncplans;
+	mergestate->ms_asyncrequests = NULL;
+	mergestate->ms_asyncresults = NULL;
+	mergestate->ms_has_asyncresults = NULL;
+	mergestate->ms_asyncremain = NULL;
+	mergestate->ms_needrequest = NULL;
+	mergestate->ms_eventset = NULL;
+	mergestate->ms_valid_asyncplans = NULL;
+
+	if (nasyncplans > 0)
+	{
+		mergestate->ms_asyncrequests = (AsyncRequest **)
+			palloc0(nplans * sizeof(AsyncRequest *));
+
+		i = -1;
+		while ((i = bms_next_member(asyncplans, i)) >= 0)
+		{
+			AsyncRequest *areq;
+
+			areq = palloc(sizeof(AsyncRequest));
+			areq->requestor = (PlanState *) mergestate;
+			areq->requestee = mergeplanstates[i];
+			areq->request_index = i;
+			areq->callback_pending = false;
+			areq->request_complete = false;
+			areq->result = NULL;
+
+			mergestate->ms_asyncrequests[i] = areq;
+		}
+
+		mergestate->ms_asyncresults = (TupleTableSlot **)
+			palloc0(nplans * sizeof(TupleTableSlot *));
+
+		if (mergestate->ms_valid_subplans_identified)
+			classify_matching_subplans(mergestate);
+	}
+
 	/*
 	 * initialize sort-key information
 	 */
@@ -231,9 +301,17 @@ ExecMergeAppend(PlanState *pstate)
 		 * run-time pruning is disabled then the valid subplans will always be
 		 * set to all subplans.
 		 */
-		if (node->ms_valid_subplans == NULL)
+		if (!node->ms_valid_subplans_identified)
+		{
 			node->ms_valid_subplans =
 				ExecFindMatchingSubPlans(node->ms_prune_state, false, NULL);
+			node->ms_valid_subplans_identified = true;
+			classify_matching_subplans(node);
+		}
+
+		/* If there are any async subplans, begin executing them. */
+		if (node->ms_nasyncplans > 0)
+			ExecMergeAppendAsyncBegin(node);
 
 		/*
 		 * First time through: pull the first tuple from each valid subplan,
@@ -246,6 +324,16 @@ ExecMergeAppend(PlanState *pstate)
 			if (!TupIsNull(node->ms_slots[i]))
 				binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
 		}
+
+		/* Look at valid async subplans */
+		i = -1;
+		while ((i = bms_next_member(node->ms_valid_asyncplans, i)) >= 0)
+		{
+			ExecMergeAppendAsyncGetNext(node, i);
+			if (!TupIsNull(node->ms_slots[i]))
+				binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
+		}
+
 		binaryheap_build(node->ms_heap);
 		node->ms_initialized = true;
 	}
@@ -260,7 +348,13 @@ ExecMergeAppend(PlanState *pstate)
 		 * to not pull tuples until necessary.)
 		 */
 		i = DatumGetInt32(binaryheap_first(node->ms_heap));
-		node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
+		if (bms_is_member(i, node->ms_asyncplans))
+			ExecMergeAppendAsyncGetNext(node, i);
+		else
+		{
+			Assert(bms_is_member(i, node->ms_valid_subplans));
+			node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
+		}
 		if (!TupIsNull(node->ms_slots[i]))
 			binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
 		else
@@ -276,6 +370,8 @@ ExecMergeAppend(PlanState *pstate)
 	{
 		i = DatumGetInt32(binaryheap_first(node->ms_heap));
 		result = node->ms_slots[i];
+		/* For async plan record that we can get the next tuple */
+		node->ms_has_asyncresults = bms_del_member(node->ms_has_asyncresults, i);
 	}
 
 	return result;
@@ -355,6 +451,7 @@ void
 ExecReScanMergeAppend(MergeAppendState *node)
 {
 	int			i;
+	int			nasyncplans = node->ms_nasyncplans;
 
 	/*
 	 * If any PARAM_EXEC Params used in pruning expressions have changed, then
@@ -365,8 +462,11 @@ ExecReScanMergeAppend(MergeAppendState *node)
 		bms_overlap(node->ps.chgParam,
 					node->ms_prune_state->execparamids))
 	{
+		node->ms_valid_subplans_identified = false;
 		bms_free(node->ms_valid_subplans);
 		node->ms_valid_subplans = NULL;
+		bms_free(node->ms_valid_asyncplans);
+		node->ms_valid_asyncplans = NULL;
 	}
 
 	for (i = 0; i < node->ms_nplans; i++)
@@ -387,6 +487,360 @@ ExecReScanMergeAppend(MergeAppendState *node)
 		if (subnode->chgParam == NULL)
 			ExecReScan(subnode);
 	}
+
+	/* Reset async state */
+	if (nasyncplans > 0)
+	{
+		i = -1;
+		while ((i = bms_next_member(node->ms_asyncplans, i)) >= 0)
+		{
+			AsyncRequest *areq = node->ms_asyncrequests[i];
+
+			areq->callback_pending = false;
+			areq->request_complete = false;
+			areq->result = NULL;
+		}
+
+		bms_free(node->ms_asyncremain);
+		node->ms_asyncremain = NULL;
+		bms_free(node->ms_needrequest);
+		node->ms_needrequest = NULL;
+		bms_free(node->ms_has_asyncresults);
+		node->ms_has_asyncresults = NULL;
+	}
 	binaryheap_reset(node->ms_heap);
 	node->ms_initialized = false;
 }
+
+/* ----------------------------------------------------------------
+ *              classify_matching_subplans
+ *
+ *              Classify the node's ms_valid_subplans into sync ones and
+ *              async ones, adjust it to contain sync ones only, and save
+ *              async ones in the node's ms_valid_asyncplans.
+ * ----------------------------------------------------------------
+ */
+static void
+classify_matching_subplans(MergeAppendState *node)
+{
+	Assert(node->ms_valid_subplans_identified);
+
+	/* Nothing to do if there are no valid subplans. */
+	if (bms_is_empty(node->ms_valid_subplans))
+	{
+		node->ms_asyncremain = NULL;
+		return;
+	}
+
+	/* No valid async subplans identified. */
+	if (!classify_matching_subplans_common(&node->ms_valid_subplans, node->ms_asyncplans, &node->ms_valid_asyncplans))
+		node->ms_asyncremain = NULL;
+}
+
+/* ----------------------------------------------------------------
+ *              ExecMergeAppendAsyncBegin
+ *
+ *              Begin executing designed async-capable subplans.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecMergeAppendAsyncBegin(MergeAppendState *node)
+{
+	int			i;
+
+	/* Backward scan is not supported by async-aware MergeAppends. */
+	Assert(ScanDirectionIsForward(node->ps.state->es_direction));
+
+	/* We should never be called when there are no subplans */
+	Assert(node->ms_nplans > 0);
+
+	/* We should never be called when there are no async subplans. */
+	Assert(node->ms_nasyncplans > 0);
+
+	/* ExecMergeAppend() identifies valid subplans */
+	Assert(node->ms_valid_subplans_identified);
+
+	/* Initialize state variables. */
+	node->ms_asyncremain = bms_copy(node->ms_valid_asyncplans);
+
+	/* Nothing to do if there are no valid async subplans. */
+	if (bms_is_empty(node->ms_asyncremain))
+		return;
+
+	/* Make a request for each of the valid async subplans. */
+	i = -1;
+	while ((i = bms_next_member(node->ms_valid_asyncplans, i)) >= 0)
+	{
+		AsyncRequest *areq = node->ms_asyncrequests[i];
+
+		Assert(areq->request_index == i);
+		Assert(!areq->callback_pending);
+
+		/* Do the actual work. */
+		ExecAsyncRequest(areq);
+	}
+}
+
+/* ----------------------------------------------------------------
+ *              ExecMergeAppendAsyncGetNext
+ *
+ *              Get the next tuple from specified asynchronous subplan.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecMergeAppendAsyncGetNext(MergeAppendState *node, int mplan)
+{
+	node->ms_slots[mplan] = NULL;
+
+	/* Request a tuple asynchronously. */
+	if (ExecMergeAppendAsyncRequest(node, mplan))
+		return;
+
+	/*
+	 * node->ms_asyncremain can be NULL if we have fetched tuples, but haven't
+	 * returned them yet. In this case ExecMergeAppendAsyncRequest() above
+	 * just returns tuples without performing a request.
+	 */
+	while (bms_is_member(mplan, node->ms_asyncremain))
+	{
+		CHECK_FOR_INTERRUPTS();
+
+		/* Wait or poll for async events. */
+		ExecMergeAppendAsyncEventWait(node);
+
+		/* Request a tuple asynchronously. */
+		if (ExecMergeAppendAsyncRequest(node, mplan))
+			return;
+
+		/*
+		 * Waiting until there's no async requests pending or we got some
+		 * tuples from our request
+		 */
+	}
+
+	/* No tuples */
+	return;
+}
+
+/* ----------------------------------------------------------------
+ *              ExecMergeAppendAsyncRequest
+ *
+ *              Request a tuple asynchronously.
+ * ----------------------------------------------------------------
+ */
+static bool
+ExecMergeAppendAsyncRequest(MergeAppendState *node, int mplan)
+{
+	Bitmapset  *needrequest;
+	int			i;
+
+	/*
+	 * If we've already fetched necessary data, just return it
+	 */
+	if (bms_is_member(mplan, node->ms_has_asyncresults))
+	{
+		node->ms_slots[mplan] = node->ms_asyncresults[mplan];
+		return true;
+	}
+
+	/*
+	 * Get a list of members which can process request and don't have data
+	 * ready.
+	 */
+	needrequest = NULL;
+	i = -1;
+	while ((i = bms_next_member(node->ms_needrequest, i)) >= 0)
+	{
+		if (!bms_is_member(i, node->ms_has_asyncresults))
+			needrequest = bms_add_member(needrequest, i);
+	}
+
+	/*
+	 * If there's no members, which still need request, no need to send it.
+	 */
+	if (bms_is_empty(needrequest))
+		return false;
+
+	/* Clear ms_needrequest flag, as we are going to send requests now */
+	node->ms_needrequest = bms_del_members(node->ms_needrequest, needrequest);
+
+	/* Make a new request for each of the async subplans that need it. */
+	i = -1;
+	while ((i = bms_next_member(needrequest, i)) >= 0)
+	{
+		AsyncRequest *areq = node->ms_asyncrequests[i];
+
+		/*
+		 * We've just checked that subplan doesn't already have some fetched
+		 * data
+		 */
+		Assert(!bms_is_member(i, node->ms_has_asyncresults));
+
+		/* Do the actual work. */
+		ExecAsyncRequest(areq);
+	}
+	bms_free(needrequest);
+
+	/* Return needed asynchronously-generated results if any. */
+	if (bms_is_member(mplan, node->ms_has_asyncresults))
+	{
+		node->ms_slots[mplan] = node->ms_asyncresults[mplan];
+		return true;
+	}
+
+	return false;
+}
+
+/* ----------------------------------------------------------------
+ *              ExecAsyncMergeAppendResponse
+ *
+ *              Receive a response from an asynchronous request we made.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncMergeAppendResponse(AsyncRequest *areq)
+{
+	MergeAppendState *node = (MergeAppendState *) areq->requestor;
+	TupleTableSlot *slot = areq->result;
+
+	/* The result should be a TupleTableSlot or NULL. */
+	Assert(slot == NULL || IsA(slot, TupleTableSlot));
+	Assert(!bms_is_member(areq->request_index, node->ms_has_asyncresults));
+
+	node->ms_asyncresults[areq->request_index] = NULL;
+	/* Nothing to do if the request is pending. */
+	if (!areq->request_complete)
+	{
+		/* The request would have been pending for a callback. */
+		Assert(areq->callback_pending);
+		return;
+	}
+
+	/* If the result is NULL or an empty slot, there's nothing more to do. */
+	if (TupIsNull(slot))
+	{
+		/* The ending subplan wouldn't have been pending for a callback. */
+		Assert(!areq->callback_pending);
+		node->ms_asyncremain = bms_del_member(node->ms_asyncremain, areq->request_index);
+		return;
+	}
+
+	node->ms_has_asyncresults = bms_add_member(node->ms_has_asyncresults, areq->request_index);
+	/* Save result so we can return it. */
+	node->ms_asyncresults[areq->request_index] = slot;
+
+	/*
+	 * Mark the subplan that returned a result as ready for a new request.  We
+	 * don't launch another one here immediately because it might complete.
+	 */
+	node->ms_needrequest = bms_add_member(node->ms_needrequest,
+										  areq->request_index);
+}
+
+/* ----------------------------------------------------------------
+ *		ExecMergeAppendAsyncEventWait
+ *
+ *		Wait or poll for file descriptor events and fire callbacks.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecMergeAppendAsyncEventWait(MergeAppendState *node)
+{
+	int			nevents = node->ms_nasyncplans + 2; /* one for PM death and
+													 * one for latch */
+	WaitEvent	occurred_event[EVENT_BUFFER_SIZE];
+	int			noccurred;
+	int			i;
+
+	/* We should never be called when there are no valid async subplans. */
+	Assert(bms_num_members(node->ms_asyncremain) > 0);
+
+	node->ms_eventset = CreateWaitEventSet(CurrentResourceOwner, nevents);
+	AddWaitEventToSet(node->ms_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
+					  NULL, NULL);
+
+	/* Give each waiting subplan a chance to add an event. */
+	i = -1;
+	while ((i = bms_next_member(node->ms_asyncplans, i)) >= 0)
+	{
+		AsyncRequest *areq = node->ms_asyncrequests[i];
+
+		if (areq->callback_pending)
+			ExecAsyncConfigureWait(areq);
+	}
+
+	/*
+	 * No need for further processing if none of the subplans configured any
+	 * events.
+	 */
+	if (GetNumRegisteredWaitEvents(node->ms_eventset) == 1)
+	{
+		FreeWaitEventSet(node->ms_eventset);
+		node->ms_eventset = NULL;
+		return;
+	}
+
+	/*
+	 * Add the process latch to the set, so that we wake up to process the
+	 * standard interrupts with CHECK_FOR_INTERRUPTS().
+	 *
+	 * NOTE: For historical reasons, it's important that this is added to the
+	 * WaitEventSet after the ExecAsyncConfigureWait() calls.  Namely,
+	 * postgres_fdw calls "GetNumRegisteredWaitEvents(set) == 1" to check if
+	 * any other events are in the set.  That's a poor design, it's
+	 * questionable for postgres_fdw to be doing that in the first place, but
+	 * we cannot change it now.  The pattern has possibly been copied to other
+	 * extensions too.
+	 */
+	AddWaitEventToSet(node->ms_eventset, WL_LATCH_SET, PGINVALID_SOCKET,
+					  MyLatch, NULL);
+
+	/* Return at most EVENT_BUFFER_SIZE events in one call. */
+	if (nevents > EVENT_BUFFER_SIZE)
+		nevents = EVENT_BUFFER_SIZE;
+
+	/*
+	 * Wait until at least one event occurs.
+	 */
+	noccurred = WaitEventSetWait(node->ms_eventset, -1 /* no timeout */ , occurred_event,
+								 nevents, WAIT_EVENT_APPEND_READY);
+	FreeWaitEventSet(node->ms_eventset);
+	node->ms_eventset = NULL;
+	if (noccurred == 0)
+		return;
+
+	/* Deliver notifications. */
+	for (i = 0; i < noccurred; i++)
+	{
+		WaitEvent  *w = &occurred_event[i];
+
+		/*
+		 * Each waiting subplan should have registered its wait event with
+		 * user_data pointing back to its AsyncRequest.
+		 */
+		if ((w->events & WL_SOCKET_READABLE) != 0)
+		{
+			AsyncRequest *areq = (AsyncRequest *) w->user_data;
+
+			if (areq->callback_pending)
+			{
+				/*
+				 * Mark it as no longer needing a callback.  We must do this
+				 * before dispatching the callback in case the callback resets
+				 * the flag.
+				 */
+				areq->callback_pending = false;
+
+				/* Do the actual work. */
+				ExecAsyncNotify(areq);
+			}
+		}
+
+		/* Handle standard interrupts */
+		if ((w->events & WL_LATCH_SET) != 0)
+		{
+			ResetLatch(MyLatch);
+			CHECK_FOR_INTERRUPTS();
+		}
+	}
+}
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index 8335cf5b5c5..cfbe512a26e 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -163,6 +163,7 @@ bool		enable_parallel_hash = true;
 bool		enable_partition_pruning = true;
 bool		enable_presorted_aggregate = true;
 bool		enable_async_append = true;
+bool		enable_async_merge_append = true;
 
 typedef struct
 {
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 5cd7fa7b897..8a023aac33b 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -1466,6 +1466,7 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path,
 	List	   *subplans = NIL;
 	ListCell   *subpaths;
 	RelOptInfo *rel = best_path->path.parent;
+	bool		consider_async = false;
 
 	/*
 	 * We don't have the actual creation of the MergeAppend node split out
@@ -1480,6 +1481,10 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path,
 	plan->righttree = NULL;
 	node->apprelids = rel->relids;
 
+	consider_async = (enable_async_merge_append &&
+					  !best_path->path.parallel_safe &&
+					  list_length(best_path->subpaths) > 1);
+
 	/*
 	 * Compute sort column info, and adjust MergeAppend's tlist as needed.
 	 * Because we pass adjust_tlist_in_place = true, we may ignore the
@@ -1580,6 +1585,10 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path,
 			subplan = sort_plan;
 		}
 
+		/* If needed, check to see if subplan can be executed asynchronously */
+		if (consider_async)
+			mark_async_capable_plan(subplan, subpath);
+
 		subplans = lappend(subplans, subplan);
 	}
 
diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat
index 1128167c025..2a670bd8db1 100644
--- a/src/backend/utils/misc/guc_parameters.dat
+++ b/src/backend/utils/misc/guc_parameters.dat
@@ -805,6 +805,14 @@
   boot_val => 'true',
 },
 
+{ name => 'enable_async_merge_append', type => 'bool', context => 'PGC_USERSET', group => 'QUERY_TUNING_METHOD',
+  short_desc => 'Enables the planner\'s use of async merge append plans.',
+  flags => 'GUC_EXPLAIN',
+  variable => 'enable_async_merge_append',
+  boot_val => 'true',
+},
+
+
 { name => 'enable_bitmapscan', type => 'bool', context => 'PGC_USERSET', group => 'QUERY_TUNING_METHOD',
   short_desc => 'Enables the planner\'s use of bitmap-scan plans.',
   flags => 'GUC_EXPLAIN',
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index f503d36b92e..ea014e006f2 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -405,6 +405,7 @@
 # - Planner Method Configuration -
 
 #enable_async_append = on
+#enable_async_merge_append = on
 #enable_bitmapscan = on
 #enable_gathermerge = on
 #enable_hashagg = on
diff --git a/src/include/executor/nodeMergeAppend.h b/src/include/executor/nodeMergeAppend.h
index 4eb05dc30d6..e3fdb26ece6 100644
--- a/src/include/executor/nodeMergeAppend.h
+++ b/src/include/executor/nodeMergeAppend.h
@@ -19,5 +19,6 @@
 extern MergeAppendState *ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags);
 extern void ExecEndMergeAppend(MergeAppendState *node);
 extern void ExecReScanMergeAppend(MergeAppendState *node);
+extern void ExecAsyncMergeAppendResponse(AsyncRequest *areq);
 
 #endif							/* NODEMERGEAPPEND_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 18ae8f0d4bb..2bef54550a3 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1545,10 +1545,67 @@ typedef struct MergeAppendState
 	TupleTableSlot **ms_slots;	/* array of length ms_nplans */
 	struct binaryheap *ms_heap; /* binary heap of slot indices */
 	bool		ms_initialized; /* are subplans started? */
+	Bitmapset  *ms_asyncplans;	/* asynchronous plans indexes */
+	int			ms_nasyncplans; /* # of asynchronous plans */
+	AsyncRequest **ms_asyncrequests;	/* array of AsyncRequests */
+	TupleTableSlot **ms_asyncresults;	/* unreturned results of async plans */
+	Bitmapset  *ms_has_asyncresults;	/* plans which have async results */
+	Bitmapset  *ms_asyncremain; /* remaining asynchronous plans */
+	Bitmapset  *ms_needrequest; /* asynchronous plans needing a new request */
+	struct WaitEventSet *ms_eventset;	/* WaitEventSet used to configure file
+										 * descriptor wait events */
 	struct PartitionPruneState *ms_prune_state;
+	bool		ms_valid_subplans_identified;	/* is ms_valid_subplans valid? */
 	Bitmapset  *ms_valid_subplans;
+	Bitmapset  *ms_valid_asyncplans;	/* valid asynchronous plans indexes */
 } MergeAppendState;
 
+/* Getters for AppendState and MergeAppendState */
+static inline struct WaitEventSet *
+GetAppendEventSet(PlanState *ps)
+{
+	Assert(IsA(ps, AppendState) || IsA(ps, MergeAppendState));
+
+	if (IsA(ps, AppendState))
+		return ((AppendState *) ps)->as_eventset;
+	else
+		return ((MergeAppendState *) ps)->ms_eventset;
+}
+
+static inline Bitmapset *
+GetNeedRequest(PlanState *ps)
+{
+	Assert(IsA(ps, AppendState) || IsA(ps, MergeAppendState));
+
+	if (IsA(ps, AppendState))
+		return ((AppendState *) ps)->as_needrequest;
+	else
+		return ((MergeAppendState *) ps)->ms_needrequest;
+}
+
+/* Common part of classify_matching_subplans() for Append and MergeAppend */
+static inline bool
+classify_matching_subplans_common(Bitmapset **valid_subplans, Bitmapset *asyncplans, Bitmapset **valid_asyncplans)
+{
+	Assert(*valid_asyncplans == NULL);
+
+	/* Checked by classify_matching_subplans() */
+	Assert(!bms_is_empty(*valid_subplans));
+
+	/* Nothing to do if there are no valid async subplans. */
+	if (!bms_overlap(*valid_subplans, asyncplans))
+		return false;
+
+	/* Get valid async subplans. */
+	*valid_asyncplans = bms_intersect(asyncplans,
+									  *valid_subplans);
+
+	/* Adjust the valid subplans to contain sync subplans only. */
+	*valid_subplans = bms_del_members(*valid_subplans,
+									  *valid_asyncplans);
+	return true;
+}
+
 /* ----------------
  *	 RecursiveUnionState information
  *
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index b523bcda8f3..fee491b77ad 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -70,6 +70,7 @@ extern PGDLLIMPORT bool enable_parallel_hash;
 extern PGDLLIMPORT bool enable_partition_pruning;
 extern PGDLLIMPORT bool enable_presorted_aggregate;
 extern PGDLLIMPORT bool enable_async_append;
+extern PGDLLIMPORT bool enable_async_merge_append;
 extern PGDLLIMPORT int constraint_exclusion;
 
 extern double index_pages_fetched(double tuples_fetched, BlockNumber pages,
diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out
index 3b37fafa65b..ae4fa42a438 100644
--- a/src/test/regress/expected/sysviews.out
+++ b/src/test/regress/expected/sysviews.out
@@ -149,6 +149,7 @@ select name, setting from pg_settings where name like 'enable%';
               name              | setting 
 --------------------------------+---------
  enable_async_append            | on
+ enable_async_merge_append      | on
  enable_bitmapscan              | on
  enable_distinct_reordering     | on
  enable_eager_aggregate         | on
@@ -173,7 +174,7 @@ select name, setting from pg_settings where name like 'enable%';
  enable_seqscan                 | on
  enable_sort                    | on
  enable_tidscan                 | on
-(25 rows)
+(26 rows)
 
 -- There are always wait event descriptions for various types.  InjectionPoint
 -- may be present or absent, depending on history since last postmaster start.
-- 
2.43.0

Reply via email to