diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index ee0b4acf0b..fe76b7cfd1 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -62,6 +62,7 @@ typedef struct ConnCacheEntry
 	Oid			serverid;		/* foreign server OID used to get server name */
 	uint32		server_hashvalue;	/* hash value of foreign server OID */
 	uint32		mapping_hashvalue;	/* hash value of user mapping OID */
+	PgFdwConnState state;		/* extra per-connection state */
 } ConnCacheEntry;
 
 /*
@@ -117,7 +118,7 @@ static bool disconnect_cached_connections(Oid serverid);
  * (not even on error), we need this flag to cue manual cleanup.
  */
 PGconn *
-GetConnection(UserMapping *user, bool will_prep_stmt)
+GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
 {
 	bool		found;
 	bool		retry = false;
@@ -196,6 +197,9 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
 	 */
 	PG_TRY();
 	{
+		/* Process a pending asynchronous request if any. */
+		if (entry->state.pendingAreq)
+			process_pending_request(entry->state.pendingAreq);
 		/* Start a new transaction or subtransaction if needed. */
 		begin_remote_xact(entry);
 	}
@@ -264,6 +268,10 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
 	/* Remember if caller will prepare statements */
 	entry->have_prep_stmt |= will_prep_stmt;
 
+	/* If caller needs access to the per-connection state, return it. */
+	if (state)
+		*state = &entry->state;
+
 	return entry->conn;
 }
 
@@ -291,6 +299,7 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user)
 	entry->mapping_hashvalue =
 		GetSysCacheHashValue1(USERMAPPINGOID,
 							  ObjectIdGetDatum(user->umid));
+	memset(&entry->state, 0, sizeof(entry->state));
 
 	/* Now try to make the connection */
 	entry->conn = connect_pg_server(server, user);
@@ -648,8 +657,12 @@ GetPrepStmtNumber(PGconn *conn)
  * Caller is responsible for the error handling on the result.
  */
 PGresult *
-pgfdw_exec_query(PGconn *conn, const char *query)
+pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
 {
+	/* First, process a pending asynchronous request, if any. */
+	if (state && state->pendingAreq)
+		process_pending_request(state->pendingAreq);
+
 	/*
 	 * Submit a query.  Since we don't use non-blocking mode, this also can
 	 * block.  But its risk is relatively small, so we ignore that for now.
@@ -940,6 +953,8 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 					{
 						entry->have_prep_stmt = false;
 						entry->have_error = false;
+						/* Also reset per-connection state */
+						memset(&entry->state, 0, sizeof(entry->state));
 					}
 
 					/* Disarm changing_xact_state if it all worked. */
@@ -1172,6 +1187,10 @@ pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
  * Cancel the currently-in-progress query (whose query text we do not have)
  * and ignore the result.  Returns true if we successfully cancel the query
  * and discard any pending result, and false if not.
+ *
+ * XXX: if the query was one sent by fetch_more_data_begin(), we could get the
+ * query text from the pendingAreq saved in the per-connection state, then
+ * report the query using it.
  */
 static bool
 pgfdw_cancel_query(PGconn *conn)
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 0649b6b81c..126065ebf9 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -19,6 +19,7 @@ DO $d$
             )$$;
     END;
 $d$;
+ALTER SERVER loopback OPTIONS (ADD async_capable 'true');
 CREATE USER MAPPING FOR public SERVER testserver1
 	OPTIONS (user 'value', password 'value');
 CREATE USER MAPPING FOR CURRENT_USER SERVER loopback;
@@ -7021,7 +7022,7 @@ INSERT INTO a(aa) VALUES('aaaaa');
 INSERT INTO b(aa) VALUES('bbb');
 INSERT INTO b(aa) VALUES('bbbb');
 INSERT INTO b(aa) VALUES('bbbbb');
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
  tableoid |  aa   
 ----------+-------
  a        | aaa
@@ -7049,7 +7050,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
 (3 rows)
 
 UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
  tableoid |   aa   
 ----------+--------
  a        | aaa
@@ -7077,7 +7078,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
 (3 rows)
 
 UPDATE b SET aa = 'new';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
  tableoid |   aa   
 ----------+--------
  a        | aaa
@@ -7105,7 +7106,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
 (3 rows)
 
 UPDATE a SET aa = 'newtoo';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
  tableoid |   aa   
 ----------+--------
  a        | newtoo
@@ -7133,7 +7134,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
 (3 rows)
 
 DELETE FROM a;
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
  tableoid | aa 
 ----------+----
 (0 rows)
@@ -7175,35 +7176,40 @@ insert into bar2 values(3,33,33);
 insert into bar2 values(4,44,44);
 insert into bar2 values(7,77,77);
 explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for update;
-                                          QUERY PLAN                                          
-----------------------------------------------------------------------------------------------
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
+                                                   QUERY PLAN                                                    
+-----------------------------------------------------------------------------------------------------------------
  LockRows
    Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid
-   ->  Hash Join
+   ->  Merge Join
          Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid
          Inner Unique: true
-         Hash Cond: (bar.f1 = foo.f1)
-         ->  Append
-               ->  Seq Scan on public.bar bar_1
+         Merge Cond: (bar.f1 = foo.f1)
+         ->  Merge Append
+               Sort Key: bar.f1
+               ->  Sort
                      Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid
+                     Sort Key: bar_1.f1
+                     ->  Seq Scan on public.bar bar_1
+                           Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid
                ->  Foreign Scan on public.bar2 bar_2
                      Output: bar_2.f1, bar_2.f2, bar_2.ctid, bar_2.*, bar_2.tableoid
-                     Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR UPDATE
-         ->  Hash
+                     Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 ORDER BY f1 ASC NULLS LAST FOR UPDATE
+         ->  Sort
                Output: foo.ctid, foo.f1, foo.*, foo.tableoid
+               Sort Key: foo.f1
                ->  HashAggregate
                      Output: foo.ctid, foo.f1, foo.*, foo.tableoid
                      Group Key: foo.f1
                      ->  Append
                            ->  Seq Scan on public.foo foo_1
                                  Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid
-                           ->  Foreign Scan on public.foo2 foo_2
+                           ->  Async Foreign Scan on public.foo2 foo_2
                                  Output: foo_2.ctid, foo_2.f1, foo_2.*, foo_2.tableoid
                                  Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
-(23 rows)
+(28 rows)
 
-select * from bar where f1 in (select f1 from foo) for update;
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
  f1 | f2 
 ----+----
   1 | 11
@@ -7213,35 +7219,40 @@ select * from bar where f1 in (select f1 from foo) for update;
 (4 rows)
 
 explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for share;
-                                          QUERY PLAN                                          
-----------------------------------------------------------------------------------------------
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
+                                                   QUERY PLAN                                                   
+----------------------------------------------------------------------------------------------------------------
  LockRows
    Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid
-   ->  Hash Join
+   ->  Merge Join
          Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid
          Inner Unique: true
-         Hash Cond: (bar.f1 = foo.f1)
-         ->  Append
-               ->  Seq Scan on public.bar bar_1
+         Merge Cond: (bar.f1 = foo.f1)
+         ->  Merge Append
+               Sort Key: bar.f1
+               ->  Sort
                      Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid
+                     Sort Key: bar_1.f1
+                     ->  Seq Scan on public.bar bar_1
+                           Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid
                ->  Foreign Scan on public.bar2 bar_2
                      Output: bar_2.f1, bar_2.f2, bar_2.ctid, bar_2.*, bar_2.tableoid
-                     Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR SHARE
-         ->  Hash
+                     Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 ORDER BY f1 ASC NULLS LAST FOR SHARE
+         ->  Sort
                Output: foo.ctid, foo.f1, foo.*, foo.tableoid
+               Sort Key: foo.f1
                ->  HashAggregate
                      Output: foo.ctid, foo.f1, foo.*, foo.tableoid
                      Group Key: foo.f1
                      ->  Append
                            ->  Seq Scan on public.foo foo_1
                                  Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid
-                           ->  Foreign Scan on public.foo2 foo_2
+                           ->  Async Foreign Scan on public.foo2 foo_2
                                  Output: foo_2.ctid, foo_2.f1, foo_2.*, foo_2.tableoid
                                  Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
-(23 rows)
+(28 rows)
 
-select * from bar where f1 in (select f1 from foo) for share;
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
  f1 | f2 
 ----+----
   1 | 11
@@ -7273,7 +7284,7 @@ update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
                      ->  Append
                            ->  Seq Scan on public.foo foo_1
                                  Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid
-                           ->  Foreign Scan on public.foo2 foo_2
+                           ->  Async Foreign Scan on public.foo2 foo_2
                                  Output: foo_2.ctid, foo_2.f1, foo_2.*, foo_2.tableoid
                                  Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
    ->  Hash Join
@@ -7291,7 +7302,7 @@ update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
                      ->  Append
                            ->  Seq Scan on public.foo foo_1
                                  Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid
-                           ->  Foreign Scan on public.foo2 foo_2
+                           ->  Async Foreign Scan on public.foo2 foo_2
                                  Output: foo_2.ctid, foo_2.f1, foo_2.*, foo_2.tableoid
                                  Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
 (39 rows)
@@ -7326,12 +7337,12 @@ where bar.f1 = ss.f1;
          ->  Append
                ->  Seq Scan on public.foo
                      Output: ROW(foo.f1), foo.f1
-               ->  Foreign Scan on public.foo2 foo_1
+               ->  Async Foreign Scan on public.foo2 foo_1
                      Output: ROW(foo_1.f1), foo_1.f1
                      Remote SQL: SELECT f1 FROM public.loct1
                ->  Seq Scan on public.foo foo_2
                      Output: ROW((foo_2.f1 + 3)), (foo_2.f1 + 3)
-               ->  Foreign Scan on public.foo2 foo_3
+               ->  Async Foreign Scan on public.foo2 foo_3
                      Output: ROW((foo_3.f1 + 3)), (foo_3.f1 + 3)
                      Remote SQL: SELECT f1 FROM public.loct1
          ->  Hash
@@ -7353,12 +7364,12 @@ where bar.f1 = ss.f1;
                ->  Append
                      ->  Seq Scan on public.foo
                            Output: ROW(foo.f1), foo.f1
-                     ->  Foreign Scan on public.foo2 foo_1
+                     ->  Async Foreign Scan on public.foo2 foo_1
                            Output: ROW(foo_1.f1), foo_1.f1
                            Remote SQL: SELECT f1 FROM public.loct1
                      ->  Seq Scan on public.foo foo_2
                            Output: ROW((foo_2.f1 + 3)), (foo_2.f1 + 3)
-                     ->  Foreign Scan on public.foo2 foo_3
+                     ->  Async Foreign Scan on public.foo2 foo_3
                            Output: ROW((foo_3.f1 + 3)), (foo_3.f1 + 3)
                            Remote SQL: SELECT f1 FROM public.loct1
 (45 rows)
@@ -7511,27 +7522,33 @@ delete from foo where f1 < 5 returning *;
 (5 rows)
 
 explain (verbose, costs off)
-update bar set f2 = f2 + 100 returning *;
-                                  QUERY PLAN                                  
-------------------------------------------------------------------------------
- Update on public.bar
-   Output: bar.f1, bar.f2
-   Update on public.bar
-   Foreign Update on public.bar2 bar_1
-   ->  Seq Scan on public.bar
-         Output: bar.f1, (bar.f2 + 100), bar.ctid
-   ->  Foreign Update on public.bar2 bar_1
-         Remote SQL: UPDATE public.loct2 SET f2 = (f2 + 100) RETURNING f1, f2
-(8 rows)
+with t as (update bar set f2 = f2 + 100 returning *) select * from t order by 1;
+                                      QUERY PLAN                                      
+--------------------------------------------------------------------------------------
+ Sort
+   Output: t.f1, t.f2
+   Sort Key: t.f1
+   CTE t
+     ->  Update on public.bar
+           Output: bar.f1, bar.f2
+           Update on public.bar
+           Foreign Update on public.bar2 bar_1
+           ->  Seq Scan on public.bar
+                 Output: bar.f1, (bar.f2 + 100), bar.ctid
+           ->  Foreign Update on public.bar2 bar_1
+                 Remote SQL: UPDATE public.loct2 SET f2 = (f2 + 100) RETURNING f1, f2
+   ->  CTE Scan on t
+         Output: t.f1, t.f2
+(14 rows)
 
-update bar set f2 = f2 + 100 returning *;
+with t as (update bar set f2 = f2 + 100 returning *) select * from t order by 1;
  f1 | f2  
 ----+-----
   1 | 311
   2 | 322
-  6 | 266
   3 | 333
   4 | 344
+  6 | 266
   7 | 277
 (6 rows)
 
@@ -8606,9 +8623,9 @@ SELECT t1.a,t2.b,t3.c FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) INNER J
  Sort
    Sort Key: t1.a, t3.c
    ->  Append
-         ->  Foreign Scan
+         ->  Async Foreign Scan
                Relations: ((ftprt1_p1 t1_1) INNER JOIN (ftprt2_p1 t2_1)) INNER JOIN (ftprt1_p1 t3_1)
-         ->  Foreign Scan
+         ->  Async Foreign Scan
                Relations: ((ftprt1_p2 t1_2) INNER JOIN (ftprt2_p2 t2_2)) INNER JOIN (ftprt1_p2 t3_2)
 (7 rows)
 
@@ -8645,19 +8662,19 @@ SELECT t1.a,t2.b,t2.c FROM fprt1 t1 LEFT JOIN (SELECT * FROM fprt2 WHERE a < 10)
 -- with whole-row reference; partitionwise join does not apply
 EXPLAIN (COSTS OFF)
 SELECT t1.wr, t2.wr FROM (SELECT t1 wr, a FROM fprt1 t1 WHERE t1.a % 25 = 0) t1 FULL JOIN (SELECT t2 wr, b FROM fprt2 t2 WHERE t2.b % 25 = 0) t2 ON (t1.a = t2.b) ORDER BY 1,2;
-                       QUERY PLAN                       
---------------------------------------------------------
+                          QUERY PLAN                          
+--------------------------------------------------------------
  Sort
    Sort Key: ((t1.*)::fprt1), ((t2.*)::fprt2)
    ->  Hash Full Join
          Hash Cond: (t1.a = t2.b)
          ->  Append
-               ->  Foreign Scan on ftprt1_p1 t1_1
-               ->  Foreign Scan on ftprt1_p2 t1_2
+               ->  Async Foreign Scan on ftprt1_p1 t1_1
+               ->  Async Foreign Scan on ftprt1_p2 t1_2
          ->  Hash
                ->  Append
-                     ->  Foreign Scan on ftprt2_p1 t2_1
-                     ->  Foreign Scan on ftprt2_p2 t2_2
+                     ->  Async Foreign Scan on ftprt2_p1 t2_1
+                     ->  Async Foreign Scan on ftprt2_p2 t2_2
 (11 rows)
 
 SELECT t1.wr, t2.wr FROM (SELECT t1 wr, a FROM fprt1 t1 WHERE t1.a % 25 = 0) t1 FULL JOIN (SELECT t2 wr, b FROM fprt2 t2 WHERE t2.b % 25 = 0) t2 ON (t1.a = t2.b) ORDER BY 1,2;
@@ -8687,9 +8704,9 @@ SELECT t1.a,t1.b FROM fprt1 t1, LATERAL (SELECT t2.a, t2.b FROM fprt2 t2 WHERE t
  Sort
    Sort Key: t1.a, t1.b
    ->  Append
-         ->  Foreign Scan
+         ->  Async Foreign Scan
                Relations: (ftprt1_p1 t1_1) INNER JOIN (ftprt2_p1 t2_1)
-         ->  Foreign Scan
+         ->  Async Foreign Scan
                Relations: (ftprt1_p2 t1_2) INNER JOIN (ftprt2_p2 t2_2)
 (7 rows)
 
@@ -8744,20 +8761,20 @@ SELECT t1.a, t1.phv, t2.b, t2.phv FROM (SELECT 't1_phv' phv, * FROM fprt1 WHERE
 -- test FOR UPDATE; partitionwise join does not apply
 EXPLAIN (COSTS OFF)
 SELECT t1.a, t2.b FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) WHERE t1.a % 25 = 0 ORDER BY 1,2 FOR UPDATE OF t1;
-                          QUERY PLAN                          
---------------------------------------------------------------
+                             QUERY PLAN                             
+--------------------------------------------------------------------
  LockRows
    ->  Sort
          Sort Key: t1.a
          ->  Hash Join
                Hash Cond: (t2.b = t1.a)
                ->  Append
-                     ->  Foreign Scan on ftprt2_p1 t2_1
-                     ->  Foreign Scan on ftprt2_p2 t2_2
+                     ->  Async Foreign Scan on ftprt2_p1 t2_1
+                     ->  Async Foreign Scan on ftprt2_p2 t2_2
                ->  Hash
                      ->  Append
-                           ->  Foreign Scan on ftprt1_p1 t1_1
-                           ->  Foreign Scan on ftprt1_p2 t1_2
+                           ->  Async Foreign Scan on ftprt1_p1 t1_1
+                           ->  Async Foreign Scan on ftprt1_p2 t1_2
 (12 rows)
 
 SELECT t1.a, t2.b FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) WHERE t1.a % 25 = 0 ORDER BY 1,2 FOR UPDATE OF t1;
@@ -8793,17 +8810,17 @@ ANALYZE fpagg_tab_p3;
 SET enable_partitionwise_aggregate TO false;
 EXPLAIN (COSTS OFF)
 SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 ORDER BY 1;
-                        QUERY PLAN                         
------------------------------------------------------------
+                           QUERY PLAN                            
+-----------------------------------------------------------------
  Sort
    Sort Key: pagg_tab.a
    ->  HashAggregate
          Group Key: pagg_tab.a
          Filter: (avg(pagg_tab.b) < '22'::numeric)
          ->  Append
-               ->  Foreign Scan on fpagg_tab_p1 pagg_tab_1
-               ->  Foreign Scan on fpagg_tab_p2 pagg_tab_2
-               ->  Foreign Scan on fpagg_tab_p3 pagg_tab_3
+               ->  Async Foreign Scan on fpagg_tab_p1 pagg_tab_1
+               ->  Async Foreign Scan on fpagg_tab_p2 pagg_tab_2
+               ->  Async Foreign Scan on fpagg_tab_p3 pagg_tab_3
 (9 rows)
 
 -- Plan with partitionwise aggregates is enabled
@@ -8815,11 +8832,11 @@ SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 O
  Sort
    Sort Key: pagg_tab.a
    ->  Append
-         ->  Foreign Scan
+         ->  Async Foreign Scan
                Relations: Aggregate on (fpagg_tab_p1 pagg_tab)
-         ->  Foreign Scan
+         ->  Async Foreign Scan
                Relations: Aggregate on (fpagg_tab_p2 pagg_tab_1)
-         ->  Foreign Scan
+         ->  Async Foreign Scan
                Relations: Aggregate on (fpagg_tab_p3 pagg_tab_2)
 (9 rows)
 
@@ -8946,7 +8963,7 @@ DO $d$
     END;
 $d$;
 ERROR:  invalid option "password"
-HINT:  Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, sslcrldir, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size, batch_size
+HINT:  Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, sslcrldir, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size, batch_size, async_capable
 CONTEXT:  SQL statement "ALTER SERVER loopback_nopw OPTIONS (ADD password 'dummypw')"
 PL/pgSQL function inline_code_block line 3 at EXECUTE
 -- If we add a password for our user mapping instead, we should get a different
diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c
index 64698c4da3..530d7a66d4 100644
--- a/contrib/postgres_fdw/option.c
+++ b/contrib/postgres_fdw/option.c
@@ -107,7 +107,8 @@ postgres_fdw_validator(PG_FUNCTION_ARGS)
 		 * Validate option value, when we can do so without any context.
 		 */
 		if (strcmp(def->defname, "use_remote_estimate") == 0 ||
-			strcmp(def->defname, "updatable") == 0)
+			strcmp(def->defname, "updatable") == 0 ||
+			strcmp(def->defname, "async_capable") == 0)
 		{
 			/* these accept only boolean values */
 			(void) defGetBoolean(def);
@@ -217,6 +218,9 @@ InitPgFdwOptions(void)
 		/* batch_size is available on both server and table */
 		{"batch_size", ForeignServerRelationId, false},
 		{"batch_size", ForeignTableRelationId, false},
+		/* async_capable is available on both server and table */
+		{"async_capable", ForeignServerRelationId, false},
+		{"async_capable", ForeignTableRelationId, false},
 		{"password_required", UserMappingRelationId, false},
 
 		/*
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 35b48575c5..1354190e42 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -21,6 +21,7 @@
 #include "commands/defrem.h"
 #include "commands/explain.h"
 #include "commands/vacuum.h"
+#include "executor/execAsync.h"
 #include "foreign/fdwapi.h"
 #include "funcapi.h"
 #include "miscadmin.h"
@@ -37,6 +38,7 @@
 #include "optimizer/tlist.h"
 #include "parser/parsetree.h"
 #include "postgres_fdw.h"
+#include "storage/latch.h"
 #include "utils/builtins.h"
 #include "utils/float.h"
 #include "utils/guc.h"
@@ -143,6 +145,7 @@ typedef struct PgFdwScanState
 
 	/* for remote query execution */
 	PGconn	   *conn;			/* connection for the scan */
+	PgFdwConnState *conn_state;	/* extra per-connection state */
 	unsigned int cursor_number; /* quasi-unique ID for my cursor */
 	bool		cursor_exists;	/* have we created the cursor? */
 	int			numParams;		/* number of parameters passed to query */
@@ -159,6 +162,9 @@ typedef struct PgFdwScanState
 	int			fetch_ct_2;		/* Min(# of fetches done, 2) */
 	bool		eof_reached;	/* true if last fetch reached EOF */
 
+	/* for asynchronous execution */
+	bool		async_capable; 	/* engage asynchronous-capable logic? */
+
 	/* working memory contexts */
 	MemoryContext batch_cxt;	/* context holding current batch of tuples */
 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
@@ -176,6 +182,7 @@ typedef struct PgFdwModifyState
 
 	/* for remote query execution */
 	PGconn	   *conn;			/* connection for the scan */
+	PgFdwConnState *conn_state;	/* extra per-connection state */
 	char	   *p_name;			/* name of prepared statement, if created */
 
 	/* extracted fdw_private data */
@@ -219,6 +226,7 @@ typedef struct PgFdwDirectModifyState
 
 	/* for remote query execution */
 	PGconn	   *conn;			/* connection for the update */
+	PgFdwConnState *conn_state;	/* extra per-connection state */
 	int			numParams;		/* number of parameters passed to query */
 	FmgrInfo   *param_flinfo;	/* output conversion functions for them */
 	List	   *param_exprs;	/* executable expressions for param values */
@@ -408,6 +416,10 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root,
 										 RelOptInfo *input_rel,
 										 RelOptInfo *output_rel,
 										 void *extra);
+static bool postgresIsForeignPathAsyncCapable(ForeignPath *path);
+static void postgresForeignAsyncRequest(AsyncRequest *areq);
+static void postgresForeignAsyncConfigureWait(AsyncRequest *areq);
+static void postgresForeignAsyncNotify(AsyncRequest *areq);
 
 /*
  * Helper functions
@@ -437,7 +449,8 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
 									  void *arg);
 static void create_cursor(ForeignScanState *node);
 static void fetch_more_data(ForeignScanState *node);
-static void close_cursor(PGconn *conn, unsigned int cursor_number);
+static void close_cursor(PGconn *conn, unsigned int cursor_number,
+						 PgFdwConnState *conn_state);
 static PgFdwModifyState *create_foreign_modify(EState *estate,
 											   RangeTblEntry *rte,
 											   ResultRelInfo *resultRelInfo,
@@ -491,6 +504,8 @@ static int	postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 										  double *totaldeadrows);
 static void analyze_row_processor(PGresult *res, int row,
 								  PgFdwAnalyzeState *astate);
+static void request_tuple_asynchronously(AsyncRequest *areq, bool fetch);
+static void fetch_more_data_begin(AsyncRequest *areq);
 static HeapTuple make_tuple_from_result_row(PGresult *res,
 											int row,
 											Relation rel,
@@ -583,6 +598,12 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
 	/* Support functions for upper relation push-down */
 	routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
 
+	/* Support functions for asynchronous execution */
+	routine->IsForeignPathAsyncCapable = postgresIsForeignPathAsyncCapable;
+	routine->ForeignAsyncRequest = postgresForeignAsyncRequest;
+	routine->ForeignAsyncConfigureWait = postgresForeignAsyncConfigureWait;
+	routine->ForeignAsyncNotify = postgresForeignAsyncNotify;
+
 	PG_RETURN_POINTER(routine);
 }
 
@@ -625,6 +646,7 @@ postgresGetForeignRelSize(PlannerInfo *root,
 	fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST;
 	fpinfo->shippable_extensions = NIL;
 	fpinfo->fetch_size = 100;
+	fpinfo->async_capable = false;
 
 	apply_server_options(fpinfo);
 	apply_table_options(fpinfo);
@@ -1458,7 +1480,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 	 * Get connection to the foreign server.  Connection manager will
 	 * establish new connection if necessary.
 	 */
-	fsstate->conn = GetConnection(user, false);
+	fsstate->conn = GetConnection(user, false, &fsstate->conn_state);
 
 	/* Assign a unique ID for my cursor */
 	fsstate->cursor_number = GetCursorNumber(fsstate->conn);
@@ -1509,6 +1531,9 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 							 &fsstate->param_flinfo,
 							 &fsstate->param_exprs,
 							 &fsstate->param_values);
+
+	/* Set the async-capable flag */
+	fsstate->async_capable = node->ss.ps.plan->async_capable;
 }
 
 /*
@@ -1523,8 +1548,10 @@ postgresIterateForeignScan(ForeignScanState *node)
 	TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
 
 	/*
-	 * If this is the first call after Begin or ReScan, we need to create the
-	 * cursor on the remote side.
+	 * In sync mode, if this is the first call after Begin or ReScan, we need
+	 * to create the cursor on the remote side.  In async mode, we would have
+	 * aready created the cursor before we get here, even if this is the first
+	 * call after Begin or ReScan.
 	 */
 	if (!fsstate->cursor_exists)
 		create_cursor(node);
@@ -1534,6 +1561,9 @@ postgresIterateForeignScan(ForeignScanState *node)
 	 */
 	if (fsstate->next_tuple >= fsstate->num_tuples)
 	{
+		/* In async mode, just clear tuple slot. */
+		if (fsstate->async_capable)
+			return ExecClearTuple(slot);
 		/* No point in another fetch if we already detected EOF, though. */
 		if (!fsstate->eof_reached)
 			fetch_more_data(node);
@@ -1595,7 +1625,7 @@ postgresReScanForeignScan(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = pgfdw_exec_query(fsstate->conn, sql);
+	res = pgfdw_exec_query(fsstate->conn, sql, fsstate->conn_state);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
 	PQclear(res);
@@ -1623,7 +1653,8 @@ postgresEndForeignScan(ForeignScanState *node)
 
 	/* Close the cursor if open, to prevent accumulation of cursors */
 	if (fsstate->cursor_exists)
-		close_cursor(fsstate->conn, fsstate->cursor_number);
+		close_cursor(fsstate->conn, fsstate->cursor_number,
+					 fsstate->conn_state);
 
 	/* Release remote connection */
 	ReleaseConnection(fsstate->conn);
@@ -2500,7 +2531,7 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
 	 * Get connection to the foreign server.  Connection manager will
 	 * establish new connection if necessary.
 	 */
-	dmstate->conn = GetConnection(user, false);
+	dmstate->conn = GetConnection(user, false, &dmstate->conn_state);
 
 	/* Update the foreign-join-related fields. */
 	if (fsplan->scan.scanrelid == 0)
@@ -2881,7 +2912,7 @@ estimate_path_cost_size(PlannerInfo *root,
 								false, &retrieved_attrs, NULL);
 
 		/* Get the remote estimate */
-		conn = GetConnection(fpinfo->user, false);
+		conn = GetConnection(fpinfo->user, false, NULL);
 		get_remote_estimate(sql.data, conn, &rows, &width,
 							&startup_cost, &total_cost);
 		ReleaseConnection(conn);
@@ -3327,7 +3358,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
 		/*
 		 * Execute EXPLAIN remotely.
 		 */
-		res = pgfdw_exec_query(conn, sql);
+		res = pgfdw_exec_query(conn, sql, NULL);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql);
 
@@ -3451,6 +3482,10 @@ create_cursor(ForeignScanState *node)
 	StringInfoData buf;
 	PGresult   *res;
 
+	/* First, process a pending asynchronous request, if any. */
+	if (fsstate->conn_state->pendingAreq)
+		process_pending_request(fsstate->conn_state->pendingAreq);
+
 	/*
 	 * Construct array of query parameter values in text format.  We do the
 	 * conversions in the short-lived per-tuple context, so as not to cause a
@@ -3531,17 +3566,38 @@ fetch_more_data(ForeignScanState *node)
 	PG_TRY();
 	{
 		PGconn	   *conn = fsstate->conn;
-		char		sql[64];
 		int			numrows;
 		int			i;
 
-		snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
-				 fsstate->fetch_size, fsstate->cursor_number);
+		if (fsstate->async_capable)
+		{
+			Assert(fsstate->conn_state->pendingAreq);
 
-		res = pgfdw_exec_query(conn, sql);
-		/* On error, report the original query, not the FETCH. */
-		if (PQresultStatus(res) != PGRES_TUPLES_OK)
-			pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+			/*
+			 * The query was already sent by an earlier call to
+			 * fetch_more_data_begin.  So now we just fetch the result.
+			 */
+			res = pgfdw_get_result(conn, fsstate->query);
+			/* On error, report the original query, not the FETCH. */
+			if (PQresultStatus(res) != PGRES_TUPLES_OK)
+				pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+
+			/* Reset per-connection state */
+			fsstate->conn_state->pendingAreq = NULL;
+		}
+		else
+		{
+			char		sql[64];
+
+			/* This is a regular synchronous fetch. */
+			snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
+					 fsstate->fetch_size, fsstate->cursor_number);
+
+			res = pgfdw_exec_query(conn, sql, fsstate->conn_state);
+			/* On error, report the original query, not the FETCH. */
+			if (PQresultStatus(res) != PGRES_TUPLES_OK)
+				pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+		}
 
 		/* Convert the data into HeapTuples */
 		numrows = PQntuples(res);
@@ -3633,7 +3689,8 @@ reset_transmission_modes(int nestlevel)
  * Utility routine to close a cursor.
  */
 static void
-close_cursor(PGconn *conn, unsigned int cursor_number)
+close_cursor(PGconn *conn, unsigned int cursor_number,
+			 PgFdwConnState *conn_state)
 {
 	char		sql[64];
 	PGresult   *res;
@@ -3644,7 +3701,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = pgfdw_exec_query(conn, sql);
+	res = pgfdw_exec_query(conn, sql, conn_state);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, sql);
 	PQclear(res);
@@ -3693,7 +3750,7 @@ create_foreign_modify(EState *estate,
 	user = GetUserMapping(userid, table->serverid);
 
 	/* Open connection; report that we'll create a prepared statement. */
-	fmstate->conn = GetConnection(user, true);
+	fmstate->conn = GetConnection(user, true, &fmstate->conn_state);
 	fmstate->p_name = NULL;		/* prepared statement not made yet */
 
 	/* Set up remote query information. */
@@ -3792,6 +3849,10 @@ execute_foreign_modify(EState *estate,
 		   operation == CMD_UPDATE ||
 		   operation == CMD_DELETE);
 
+	/* First, process a pending asynchronous request, if any. */
+	if (fmstate->conn_state->pendingAreq)
+		process_pending_request(fmstate->conn_state->pendingAreq);
+
 	/*
 	 * If the existing query was deparsed and prepared for a different number
 	 * of rows, rebuild it for the proper number.
@@ -3893,6 +3954,11 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 	char	   *p_name;
 	PGresult   *res;
 
+	/*
+	 * The caller would already have processed a pending asynchronous request
+	 * if any, so no need to do it here.
+	 */
+
 	/* Construct name we'll use for the prepared statement. */
 	snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
 			 GetPrepStmtNumber(fmstate->conn));
@@ -4078,7 +4144,7 @@ deallocate_query(PgFdwModifyState *fmstate)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = pgfdw_exec_query(fmstate->conn, sql);
+	res = pgfdw_exec_query(fmstate->conn, sql, fmstate->conn_state);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
 	PQclear(res);
@@ -4226,6 +4292,10 @@ execute_dml_stmt(ForeignScanState *node)
 	int			numParams = dmstate->numParams;
 	const char **values = dmstate->param_values;
 
+	/* First, process a pending asynchronous request, if any. */
+	if (dmstate->conn_state->pendingAreq)
+		process_pending_request(dmstate->conn_state->pendingAreq);
+
 	/*
 	 * Construct array of query parameter values in text format.
 	 */
@@ -4627,7 +4697,7 @@ postgresAnalyzeForeignTable(Relation relation,
 	 */
 	table = GetForeignTable(RelationGetRelid(relation));
 	user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
-	conn = GetConnection(user, false);
+	conn = GetConnection(user, false, NULL);
 
 	/*
 	 * Construct command to get page count for relation.
@@ -4638,7 +4708,7 @@ postgresAnalyzeForeignTable(Relation relation,
 	/* In what follows, do not risk leaking any PGresults. */
 	PG_TRY();
 	{
-		res = pgfdw_exec_query(conn, sql.data);
+		res = pgfdw_exec_query(conn, sql.data, NULL);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql.data);
 
@@ -4713,7 +4783,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 	table = GetForeignTable(RelationGetRelid(relation));
 	server = GetForeignServer(table->serverid);
 	user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
-	conn = GetConnection(user, false);
+	conn = GetConnection(user, false, NULL);
 
 	/*
 	 * Construct cursor that retrieves whole rows from remote.
@@ -4730,7 +4800,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 		int			fetch_size;
 		ListCell   *lc;
 
-		res = pgfdw_exec_query(conn, sql.data);
+		res = pgfdw_exec_query(conn, sql.data, NULL);
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql.data);
 		PQclear(res);
@@ -4782,7 +4852,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 			 */
 
 			/* Fetch some rows */
-			res = pgfdw_exec_query(conn, fetch_sql);
+			res = pgfdw_exec_query(conn, fetch_sql, NULL);
 			/* On error, report the original query, not the FETCH. */
 			if (PQresultStatus(res) != PGRES_TUPLES_OK)
 				pgfdw_report_error(ERROR, res, conn, false, sql.data);
@@ -4801,7 +4871,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 		}
 
 		/* Close the cursor, just to be tidy. */
-		close_cursor(conn, cursor_number);
+		close_cursor(conn, cursor_number, NULL);
 	}
 	PG_CATCH();
 	{
@@ -4941,7 +5011,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 	 */
 	server = GetForeignServer(serverOid);
 	mapping = GetUserMapping(GetUserId(), server->serverid);
-	conn = GetConnection(mapping, false);
+	conn = GetConnection(mapping, false, NULL);
 
 	/* Don't attempt to import collation if remote server hasn't got it */
 	if (PQserverVersion(conn) < 90100)
@@ -4957,7 +5027,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 		appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
 		deparseStringLiteral(&buf, stmt->remote_schema);
 
-		res = pgfdw_exec_query(conn, buf.data);
+		res = pgfdw_exec_query(conn, buf.data, NULL);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, buf.data);
 
@@ -5069,7 +5139,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 		appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
 
 		/* Fetch the data */
-		res = pgfdw_exec_query(conn, buf.data);
+		res = pgfdw_exec_query(conn, buf.data, NULL);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, buf.data);
 
@@ -5529,6 +5599,8 @@ apply_server_options(PgFdwRelationInfo *fpinfo)
 				ExtractExtensionList(defGetString(def), false);
 		else if (strcmp(def->defname, "fetch_size") == 0)
 			fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
+		else if (strcmp(def->defname, "async_capable") == 0)
+			fpinfo->async_capable = defGetBoolean(def);
 	}
 }
 
@@ -5550,6 +5622,8 @@ apply_table_options(PgFdwRelationInfo *fpinfo)
 			fpinfo->use_remote_estimate = defGetBoolean(def);
 		else if (strcmp(def->defname, "fetch_size") == 0)
 			fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
+		else if (strcmp(def->defname, "async_capable") == 0)
+			fpinfo->async_capable = defGetBoolean(def);
 	}
 }
 
@@ -5584,6 +5658,7 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo,
 	fpinfo->shippable_extensions = fpinfo_o->shippable_extensions;
 	fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate;
 	fpinfo->fetch_size = fpinfo_o->fetch_size;
+	fpinfo->async_capable = fpinfo_o->async_capable;
 
 	/* Merge the table level options from either side of the join. */
 	if (fpinfo_i)
@@ -5605,6 +5680,13 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo,
 		 * relation sizes.
 		 */
 		fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size);
+
+		/*
+		 * We'll prefer to consider this join async-capable if any table from
+		 * either side of the join is considered async-capable.
+		 */
+		fpinfo->async_capable = fpinfo_o->async_capable ||
+			fpinfo_i->async_capable;
 	}
 }
 
@@ -6488,6 +6570,214 @@ add_foreign_final_paths(PlannerInfo *root, RelOptInfo *input_rel,
 	add_path(final_rel, (Path *) final_path);
 }
 
+/*
+ * postgresIsForeignPathAsyncCapable
+ *		Check whether a given ForeignPath node is async-capable.
+ */
+static bool
+postgresIsForeignPathAsyncCapable(ForeignPath *path)
+{
+	RelOptInfo *rel = ((Path *) path)->parent;
+	PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private;
+
+	return fpinfo->async_capable;
+}
+
+/*
+ * postgresForeignAsyncRequest
+ *		Asynchronously request next tuple from a foreign PostgreSQL table.
+ */
+static void
+postgresForeignAsyncRequest(AsyncRequest *areq)
+{
+	request_tuple_asynchronously(areq, true);
+}
+
+/*
+ * postgresForeignAsyncConfigureWait
+ *		Configure a file descriptor event for which we wish to wait.
+ */
+static void
+postgresForeignAsyncConfigureWait(AsyncRequest *areq)
+{
+	ForeignScanState *node = (ForeignScanState *) areq->requestee;
+	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+	AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq;
+	AppendState *requestor = (AppendState *) areq->requestor;
+	WaitEventSet *set = requestor->as_eventset;
+
+	/* This should not be called unless callback_pending */
+	Assert(areq->callback_pending);
+
+	/* The core code would have registered postmaster death event */
+	Assert(GetNumRegisteredWaitEvents(set) >= 1);
+
+	/* Begin an asynchronous data fetch if necessary */
+	if (!pendingAreq)
+		fetch_more_data_begin(areq);
+	else if (pendingAreq->requestor != areq->requestor)
+	{
+		if (GetNumRegisteredWaitEvents(set) > 1)
+			return;
+		process_pending_request(pendingAreq);
+		fetch_more_data_begin(areq);
+	}
+	else if (pendingAreq->requestee != areq->requestee)
+		return;
+	else
+		Assert(pendingAreq == areq);
+
+	AddWaitEventToSet(set, WL_SOCKET_READABLE, PQsocket(fsstate->conn),
+					  NULL, areq);
+}
+
+/*
+ * postgresForeignAsyncNotify
+ *		Fetch some more tuples from a file descriptor that becomes ready,
+ *		requesting next tuple.
+ */
+static void
+postgresForeignAsyncNotify(AsyncRequest *areq)
+{
+	ForeignScanState *node = (ForeignScanState *) areq->requestee;
+	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+
+	/* The core code would have initialized the callback_pending flag */
+	Assert(!areq->callback_pending);
+
+	/* The request should be currently in-process */
+	Assert(fsstate->conn_state->pendingAreq == areq);
+
+	/* On error, report the original query, not the FETCH. */
+	if (!PQconsumeInput(fsstate->conn))
+		pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query);
+
+	fetch_more_data(node);
+
+	request_tuple_asynchronously(areq, true);
+}
+
+/*
+ * Asynchronously request next tuple from a foreign PostgreSQL table.
+ */
+static void
+request_tuple_asynchronously(AsyncRequest *areq, bool fetch)
+{
+	ForeignScanState *node = (ForeignScanState *) areq->requestee;
+	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+	AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq;
+	TupleTableSlot *result;
+
+	/* This should not be called if the request is currently in-process */
+	Assert(areq != pendingAreq);
+
+	/* Request some more tuples, if we've run out */
+	if (fsstate->next_tuple >= fsstate->num_tuples)
+	{
+		/* No point in another fetch if we already detected EOF, though */
+		if (!fsstate->eof_reached)
+		{
+			/* Mark the request as needing a callback */
+			areq->callback_pending = true;
+			areq->request_complete = false;
+			/* Begin another fetch if requested and if no pending request */
+			if (fetch && !pendingAreq)
+				fetch_more_data_begin(areq);
+		}
+		else
+		{
+			/* There's nothing more to do; just return a NULL pointer */
+			result = NULL;
+			/* Mark the request as complete */
+			ExecAsyncRequestDone(areq, result);
+		}
+		return;
+	}
+
+	/* Get a tuple from the ForeignScan node */
+	result = ExecProcNode((PlanState *) node);
+	if (!TupIsNull(result))
+	{
+		/* Mark the request as complete */
+		ExecAsyncRequestDone(areq, result);
+		return;
+	}
+	Assert(fsstate->next_tuple >= fsstate->num_tuples);
+
+	/* Request some more tuples, if we've not detected EOF yet */
+	if (!fsstate->eof_reached)
+	{
+		/* Mark the request as needing a callback */
+		areq->callback_pending = true;
+		areq->request_complete = false;
+		/* Begin another fetch if requested and if no pending request */
+		if (fetch && !pendingAreq)
+			fetch_more_data_begin(areq);
+	}
+	else
+	{
+		/* There's nothing more to do; just return a NULL pointer */
+		result = NULL;
+		/* Mark the request as complete */
+		ExecAsyncRequestDone(areq, result);
+	}
+}
+
+/*
+ * Begin an asynchronous data fetch.
+ *
+ * Note: fetch_more_data must be called to fetch the result.
+ */
+static void
+fetch_more_data_begin(AsyncRequest *areq)
+{
+	ForeignScanState *node = (ForeignScanState *) areq->requestee;
+	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+	char		sql[64];
+
+	Assert(!fsstate->conn_state->pendingAreq);
+
+	/* Create the cursor synchronously. */
+	if (!fsstate->cursor_exists)
+		create_cursor(node);
+
+	/* We will send this query, but not wait for the response. */
+	snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
+			 fsstate->fetch_size, fsstate->cursor_number);
+
+	if (PQsendQuery(fsstate->conn, sql) < 0)
+		pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query);
+
+	/* Remember that the request is in process */
+	fsstate->conn_state->pendingAreq = areq;
+}
+
+/*
+ * Process a pending asynchronous request.
+ */
+void
+process_pending_request(AsyncRequest *areq)
+{
+	ForeignScanState *node = (ForeignScanState *) areq->requestee;
+	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+	EState	   *estate = node->ss.ps.state;
+	MemoryContext oldcontext;
+
+	/* The request should be currently in-process */
+	Assert(fsstate->conn_state->pendingAreq == areq);
+
+	oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);
+
+	fetch_more_data(node);
+
+	request_tuple_asynchronously(areq, false);
+
+	/* Unlike AsyncRequest/AsyncNotify, we call ExecAsyncResponse ourselves */
+	ExecAsyncResponse(areq);
+
+	MemoryContextSwitchTo(oldcontext);
+}
+
 /*
  * Create a tuple from the specified row of the PGresult.
  *
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 1f67b4d9fd..88d94da6f6 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -16,6 +16,7 @@
 #include "foreign/foreign.h"
 #include "lib/stringinfo.h"
 #include "libpq-fe.h"
+#include "nodes/execnodes.h"
 #include "nodes/pathnodes.h"
 #include "utils/relcache.h"
 
@@ -78,6 +79,7 @@ typedef struct PgFdwRelationInfo
 	Cost		fdw_startup_cost;
 	Cost		fdw_tuple_cost;
 	List	   *shippable_extensions;	/* OIDs of shippable extensions */
+	bool		async_capable;
 
 	/* Cached catalog information. */
 	ForeignTable *table;
@@ -124,17 +126,28 @@ typedef struct PgFdwRelationInfo
 	int			relation_index;
 } PgFdwRelationInfo;
 
+/*
+ * Extra control information relating to a connection.
+ */
+typedef struct PgFdwConnState
+{
+	AsyncRequest *pendingAreq;	/* pending async request */
+} PgFdwConnState;
+
 /* in postgres_fdw.c */
 extern int	set_transmission_modes(void);
 extern void reset_transmission_modes(int nestlevel);
+extern void process_pending_request(AsyncRequest *areq);
 
 /* in connection.c */
-extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
+extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt,
+							 PgFdwConnState **state);
 extern void ReleaseConnection(PGconn *conn);
 extern unsigned int GetCursorNumber(PGconn *conn);
 extern unsigned int GetPrepStmtNumber(PGconn *conn);
 extern PGresult *pgfdw_get_result(PGconn *conn, const char *query);
-extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query);
+extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query,
+								  PgFdwConnState *state);
 extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
 							   bool clear, const char *sql);
 
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 2b525ea44a..320844be02 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -22,6 +22,8 @@ DO $d$
     END;
 $d$;
 
+ALTER SERVER loopback OPTIONS (ADD async_capable 'true');
+
 CREATE USER MAPPING FOR public SERVER testserver1
 	OPTIONS (user 'value', password 'value');
 CREATE USER MAPPING FOR CURRENT_USER SERVER loopback;
@@ -1822,31 +1824,31 @@ INSERT INTO b(aa) VALUES('bbb');
 INSERT INTO b(aa) VALUES('bbbb');
 INSERT INTO b(aa) VALUES('bbbbb');
 
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
 SELECT tableoid::regclass, * FROM b;
 SELECT tableoid::regclass, * FROM ONLY a;
 
 UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%';
 
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
 SELECT tableoid::regclass, * FROM b;
 SELECT tableoid::regclass, * FROM ONLY a;
 
 UPDATE b SET aa = 'new';
 
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
 SELECT tableoid::regclass, * FROM b;
 SELECT tableoid::regclass, * FROM ONLY a;
 
 UPDATE a SET aa = 'newtoo';
 
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
 SELECT tableoid::regclass, * FROM b;
 SELECT tableoid::regclass, * FROM ONLY a;
 
 DELETE FROM a;
 
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
 SELECT tableoid::regclass, * FROM b;
 SELECT tableoid::regclass, * FROM ONLY a;
 
@@ -1882,12 +1884,12 @@ insert into bar2 values(4,44,44);
 insert into bar2 values(7,77,77);
 
 explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for update;
-select * from bar where f1 in (select f1 from foo) for update;
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
 
 explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for share;
-select * from bar where f1 in (select f1 from foo) for share;
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
 
 -- Check UPDATE with inherited target and an inherited source table
 explain (verbose, costs off)
@@ -1946,8 +1948,8 @@ explain (verbose, costs off)
 delete from foo where f1 < 5 returning *;
 delete from foo where f1 < 5 returning *;
 explain (verbose, costs off)
-update bar set f2 = f2 + 100 returning *;
-update bar set f2 = f2 + 100 returning *;
+with t as (update bar set f2 = f2 + 100 returning *) select * from t order by 1;
+with t as (update bar set f2 = f2 + 100 returning *) select * from t order by 1;
 
 -- Test that UPDATE/DELETE with inherited target works with row-level triggers
 CREATE TRIGGER trig_row_before
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 967de73596..dc2a0d0987 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4781,6 +4781,20 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
       </para>
 
      <variablelist>
+     <varlistentry id="guc-enable-async-append" xreflabel="enable_async_append">
+      <term><varname>enable_async_append</varname> (<type>boolean</type>)
+      <indexterm>
+       <primary><varname>enable_async_append</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Enables or disables the query planner's use of async-aware
+        append plan types. The default is <literal>on</literal>.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-enable-bitmapscan" xreflabel="enable_bitmapscan">
       <term><varname>enable_bitmapscan</varname> (<type>boolean</type>)
       <indexterm>
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 3513e127b7..2ba4223915 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1563,6 +1563,10 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
     </thead>
 
     <tbody>
+     <row>
+      <entry><literal>AppendReady</literal></entry>
+      <entry>Waiting for a subplan of Append to be ready.</entry>
+     </row>
      <row>
       <entry><literal>BackupWaitWalArchive</literal></entry>
       <entry>Waiting for WAL files required for a backup to be successfully
diff --git a/doc/src/sgml/postgres-fdw.sgml b/doc/src/sgml/postgres-fdw.sgml
index 07aa25799d..153ff08d91 100644
--- a/doc/src/sgml/postgres-fdw.sgml
+++ b/doc/src/sgml/postgres-fdw.sgml
@@ -371,6 +371,34 @@ OPTIONS (ADD password_required 'false');
 
   </sect3>
 
+  <sect3>
+   <title>Asynchronous Execution Options</title>
+
+   <para>
+    <filename>postgres_fdw</filename> supports asynchronous execution that
+    runs multiple subplan nodes of an <structname>Append</structname> plan
+    node concurrently rather than serially to improve query performance.
+    This execution can be controled using the following option:
+   </para>
+
+   <variablelist>
+
+    <varlistentry>
+     <term><literal>async_capable</literal></term>
+     <listitem>
+      <para>
+       This option controls whether <filename>postgres_fdw</filename> allows
+       foreign tables to be scanned concurrently for asynchronous execution.
+       It can be specified for a foreign table or a foreign server.
+       A table-level option overrides a server-level option.
+       The default is <literal>false</literal>.
+      </para>
+     </listitem>
+    </varlistentry>
+
+   </variablelist>
+  </sect3>
+
   <sect3>
    <title>Updatability Options</title>
 
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index afc45429ba..fe75cabdcc 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -1394,6 +1394,8 @@ ExplainNode(PlanState *planstate, List *ancestors,
 		}
 		if (plan->parallel_aware)
 			appendStringInfoString(es->str, "Parallel ");
+		if (plan->async_capable)
+			appendStringInfoString(es->str, "Async ");
 		appendStringInfoString(es->str, pname);
 		es->indent++;
 	}
@@ -1413,6 +1415,7 @@ ExplainNode(PlanState *planstate, List *ancestors,
 		if (custom_name)
 			ExplainPropertyText("Custom Plan Provider", custom_name, es);
 		ExplainPropertyBool("Parallel Aware", plan->parallel_aware, es);
+		ExplainPropertyBool("Async Capable", plan->async_capable, es);
 	}
 
 	switch (nodeTag(plan))
diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile
index 74ac59faa1..680fd69151 100644
--- a/src/backend/executor/Makefile
+++ b/src/backend/executor/Makefile
@@ -14,6 +14,7 @@ include $(top_builddir)/src/Makefile.global
 
 OBJS = \
 	execAmi.o \
+	execAsync.o \
 	execCurrent.o \
 	execExpr.o \
 	execExprInterp.o \
diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c
index 4543ac79ed..069c6ba948 100644
--- a/src/backend/executor/execAmi.c
+++ b/src/backend/executor/execAmi.c
@@ -531,6 +531,10 @@ ExecSupportsBackwardScan(Plan *node)
 			{
 				ListCell   *l;
 
+				/* With async, tuples may be interleaved, so can't back up. */
+				if (((Append *) node)->nasyncplans != 0)
+					return false;
+
 				foreach(l, ((Append *) node)->appendplans)
 				{
 					if (!ExecSupportsBackwardScan((Plan *) lfirst(l)))
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c
index e69de29bb2..e3d85ffabc 100644
--- a/src/backend/executor/execAsync.c
+++ b/src/backend/executor/execAsync.c
@@ -0,0 +1,111 @@
+/*-------------------------------------------------------------------------
+ *
+ * execAsync.c
+ *	  Support routines for asynchronous execution
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/backend/executor/execAsync.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "executor/execAsync.h"
+#include "executor/nodeAppend.h"
+#include "executor/nodeForeignscan.h"
+
+/*
+ * Asynchronously request a tuple from a designed async-capable node.
+ */
+void
+ExecAsyncRequest(AsyncRequest *areq)
+{
+	switch (nodeTag(areq->requestee))
+	{
+		case T_ForeignScanState:
+			ExecAsyncForeignScanRequest(areq);
+			break;
+		default:
+			/* If the node doesn't support async, caller messed up. */
+			elog(ERROR, "unrecognized node type: %d",
+				 (int) nodeTag(areq->requestee));
+	}
+
+	ExecAsyncResponse(areq);
+}
+
+/*
+ * Give the asynchronous node a chance to configure the file descriptor event
+ * for which it wishes to wait.  We expect the node-type specific callback to
+ * make a sigle call of the following form:
+ *
+ * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq);
+ */
+void
+ExecAsyncConfigureWait(AsyncRequest *areq)
+{
+	switch (nodeTag(areq->requestee))
+	{
+		case T_ForeignScanState:
+			ExecAsyncForeignScanConfigureWait(areq);
+			break;
+		default:
+			/* If the node doesn't support async, caller messed up. */
+			elog(ERROR, "unrecognized node type: %d",
+				 (int) nodeTag(areq->requestee));
+	}
+}
+
+/*
+ * Call the asynchronous node back when a relevant event has occurred.
+ */
+void
+ExecAsyncNotify(AsyncRequest *areq)
+{
+	switch (nodeTag(areq->requestee))
+	{
+		case T_ForeignScanState:
+			ExecAsyncForeignScanNotify(areq);
+			break;
+		default:
+			/* If the node doesn't support async, caller messed up. */
+			elog(ERROR, "unrecognized node type: %d",
+				 (int) nodeTag(areq->requestee));
+	}
+
+	ExecAsyncResponse(areq);
+}
+
+/*
+ * Call the requestor back when an asynchronous node has produced a result.
+ */
+void
+ExecAsyncResponse(AsyncRequest *areq)
+{
+	switch (nodeTag(areq->requestor))
+	{
+		case T_AppendState:
+			ExecAsyncAppendResponse(areq);
+			break;
+		default:
+			/* If the node doesn't support async, caller messed up. */
+			elog(ERROR, "unrecognized node type: %d",
+				(int) nodeTag(areq->requestor));
+	}
+}
+
+/*
+ * A requestee node should call this function to deliver the tuple to its
+ * requestor node.  The node can call this from its ExecAsyncRequest callback
+ * if the requested tuple is available immediately.
+ */
+void
+ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result)
+{
+	areq->request_complete = true;
+	areq->result = result;
+}
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 15e4115bd6..123d5163de 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -57,10 +57,13 @@
 
 #include "postgres.h"
 
+#include "executor/execAsync.h"
 #include "executor/execdebug.h"
 #include "executor/execPartition.h"
 #include "executor/nodeAppend.h"
 #include "miscadmin.h"
+#include "pgstat.h"
+#include "storage/latch.h"
 
 /* Shared state for parallel-aware Append. */
 struct ParallelAppendState
@@ -78,12 +81,18 @@ struct ParallelAppendState
 };
 
 #define INVALID_SUBPLAN_INDEX		-1
+#define EVENT_BUFFER_SIZE			16
 
 static TupleTableSlot *ExecAppend(PlanState *pstate);
 static bool choose_next_subplan_locally(AppendState *node);
 static bool choose_next_subplan_for_leader(AppendState *node);
 static bool choose_next_subplan_for_worker(AppendState *node);
 static void mark_invalid_subplans_as_finished(AppendState *node);
+static void ExecAppendAsyncBegin(AppendState *node);
+static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result);
+static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result);
+static void ExecAppendAsyncEventWait(AppendState *node);
+static void classify_matching_subplans(AppendState *node);
 
 /* ----------------------------------------------------------------
  *		ExecInitAppend
@@ -102,7 +111,9 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 	AppendState *appendstate = makeNode(AppendState);
 	PlanState **appendplanstates;
 	Bitmapset  *validsubplans;
+	Bitmapset  *asyncplans;
 	int			nplans;
+	int			nasyncplans;
 	int			firstvalid;
 	int			i,
 				j;
@@ -119,6 +130,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 
 	/* Let choose_next_subplan_* function handle setting the first subplan */
 	appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
+	appendstate->as_syncdone = false;
 
 	/* If run-time partition pruning is enabled, then set that up now */
 	if (node->part_prune_info != NULL)
@@ -191,12 +203,24 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 	 * While at it, find out the first valid partial plan.
 	 */
 	j = 0;
+	asyncplans = NULL;
+	nasyncplans = 0;
 	firstvalid = nplans;
 	i = -1;
 	while ((i = bms_next_member(validsubplans, i)) >= 0)
 	{
 		Plan	   *initNode = (Plan *) list_nth(node->appendplans, i);
 
+		/*
+		 * Record async subplans.  When executing EvalPlanQual, we process
+		 * async subplans synchronously, so don't do this in that case.
+		 */
+		if (initNode->async_capable && estate->es_epq_active == NULL)
+		{
+			asyncplans = bms_add_member(asyncplans, j);
+			nasyncplans++;
+		}
+
 		/*
 		 * Record the lowest appendplans index which is a valid partial plan.
 		 */
@@ -210,6 +234,39 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 	appendstate->appendplans = appendplanstates;
 	appendstate->as_nplans = nplans;
 
+	/* Initialize async state */
+	appendstate->as_asyncplans = asyncplans;
+	appendstate->as_nasyncplans = nasyncplans;
+	appendstate->as_asyncrequests = NULL;
+	appendstate->as_asyncresults = (TupleTableSlot **)
+		palloc0(nasyncplans * sizeof(TupleTableSlot *));
+	appendstate->as_needrequest = NULL;
+	appendstate->as_eventset = NULL;
+
+	if (nasyncplans > 0)
+	{
+		appendstate->as_asyncrequests = (AsyncRequest **)
+			palloc0(nplans * sizeof(AsyncRequest *));
+
+		i = -1;
+		while ((i = bms_next_member(asyncplans, i)) >= 0)
+		{
+			AsyncRequest *areq;
+
+			areq = palloc(sizeof(AsyncRequest));
+			areq->requestor = (PlanState *) appendstate;
+			areq->requestee = appendplanstates[i];
+			areq->request_index = i;
+			areq->callback_pending = false;
+			areq->request_complete = false;
+			areq->result = NULL;
+
+			appendstate->as_asyncrequests[i] = areq;
+		}
+
+		classify_matching_subplans(appendstate);
+	}
+
 	/*
 	 * Miscellaneous initialization
 	 */
@@ -232,31 +289,45 @@ static TupleTableSlot *
 ExecAppend(PlanState *pstate)
 {
 	AppendState *node = castNode(AppendState, pstate);
+	TupleTableSlot *result;
 
-	if (node->as_whichplan < 0)
+	if (!node->as_syncdone && node->as_whichplan == INVALID_SUBPLAN_INDEX)
 	{
 		/* Nothing to do if there are no subplans */
 		if (node->as_nplans == 0)
 			return ExecClearTuple(node->ps.ps_ResultTupleSlot);
 
+		/* If there are any async subplans, begin execution of them */
+		if (node->as_nasyncplans > 0)
+			ExecAppendAsyncBegin(node);
+
 		/*
-		 * If no subplan has been chosen, we must choose one before
+		 * If no sync subplan has been chosen, we must choose one before
 		 * proceeding.
 		 */
-		if (node->as_whichplan == INVALID_SUBPLAN_INDEX &&
-			!node->choose_next_subplan(node))
+		if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
 			return ExecClearTuple(node->ps.ps_ResultTupleSlot);
 	}
 
 	for (;;)
 	{
 		PlanState  *subnode;
-		TupleTableSlot *result;
 
 		CHECK_FOR_INTERRUPTS();
 
 		/*
-		 * figure out which subplan we are currently processing
+		 * try to get a tuple from any of the async subplans
+		 */
+		if (!bms_is_empty(node->as_needrequest) ||
+			(node->as_syncdone && node->as_nasyncremain > 0))
+		{
+			if (ExecAppendAsyncGetNext(node, &result))
+				return result;
+			Assert(bms_is_empty(node->as_needrequest));
+		}
+
+		/*
+		 * figure out which sync subplan we are currently processing
 		 */
 		Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
 		subnode = node->appendplans[node->as_whichplan];
@@ -276,8 +347,16 @@ ExecAppend(PlanState *pstate)
 			return result;
 		}
 
-		/* choose new subplan; if none, we're done */
-		if (!node->choose_next_subplan(node))
+		/* wait or poll async events */
+		if (node->as_nasyncremain > 0)
+		{
+			Assert(!node->as_syncdone);
+			Assert(bms_is_empty(node->as_needrequest));
+			ExecAppendAsyncEventWait(node);
+		}
+
+		/* choose new sync subplan; if no sync/async subplans, we're done */
+		if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
 			return ExecClearTuple(node->ps.ps_ResultTupleSlot);
 	}
 }
@@ -313,6 +392,7 @@ ExecEndAppend(AppendState *node)
 void
 ExecReScanAppend(AppendState *node)
 {
+	int			nasyncplans = node->as_nasyncplans;
 	int			i;
 
 	/*
@@ -326,6 +406,11 @@ ExecReScanAppend(AppendState *node)
 	{
 		bms_free(node->as_valid_subplans);
 		node->as_valid_subplans = NULL;
+		if (nasyncplans > 0)
+		{
+			bms_free(node->as_valid_asyncplans);
+			node->as_valid_asyncplans = NULL;
+		}
 	}
 
 	for (i = 0; i < node->as_nplans; i++)
@@ -347,8 +432,26 @@ ExecReScanAppend(AppendState *node)
 			ExecReScan(subnode);
 	}
 
+	/* Reset async state */
+	if (nasyncplans > 0)
+	{
+		i = -1;
+		while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
+		{
+			AsyncRequest *areq = node->as_asyncrequests[i];
+
+			areq->callback_pending = false;
+			areq->request_complete = false;
+			areq->result = NULL;
+		}
+
+		bms_free(node->as_needrequest);
+		node->as_needrequest = NULL;
+	}
+
 	/* Let choose_next_subplan_* function handle setting the first subplan */
 	node->as_whichplan = INVALID_SUBPLAN_INDEX;
+	node->as_syncdone = false;
 }
 
 /* ----------------------------------------------------------------
@@ -429,7 +532,7 @@ ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
 /* ----------------------------------------------------------------
  *		choose_next_subplan_locally
  *
- *		Choose next subplan for a non-parallel-aware Append,
+ *		Choose next sync subplan for a non-parallel-aware Append,
  *		returning false if there are no more.
  * ----------------------------------------------------------------
  */
@@ -444,9 +547,9 @@ choose_next_subplan_locally(AppendState *node)
 
 	/*
 	 * If first call then have the bms member function choose the first valid
-	 * subplan by initializing whichplan to -1.  If there happen to be no
-	 * valid subplans then the bms member function will handle that by
-	 * returning a negative number which will allow us to exit returning a
+	 * sync subplan by initializing whichplan to -1.  If there happen to be
+	 * no valid sync subplans then the bms member function will handle that
+	 * by returning a negative number which will allow us to exit returning a
 	 * false value.
 	 */
 	if (whichplan == INVALID_SUBPLAN_INDEX)
@@ -467,7 +570,10 @@ choose_next_subplan_locally(AppendState *node)
 		nextplan = bms_prev_member(node->as_valid_subplans, whichplan);
 
 	if (nextplan < 0)
+	{
+		node->as_syncdone = true;
 		return false;
+	}
 
 	node->as_whichplan = nextplan;
 
@@ -709,3 +815,298 @@ mark_invalid_subplans_as_finished(AppendState *node)
 			node->as_pstate->pa_finished[i] = true;
 	}
 }
+
+/* ----------------------------------------------------------------
+ *		ExecAppendAsyncBegin
+ *
+ *		Begin execution of designed async-capable subplans.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecAppendAsyncBegin(AppendState *node)
+{
+	Bitmapset  *valid_asyncplans;
+	int			i;
+
+	/* We should never be called when there are no async subplans. */
+	Assert(node->as_nasyncplans > 0);
+
+	if (node->as_valid_subplans == NULL)
+	{
+		Assert(node->as_valid_asyncplans == NULL);
+
+		node->as_valid_subplans =
+			ExecFindMatchingSubPlans(node->as_prune_state);
+
+		classify_matching_subplans(node);
+	}
+
+	node->as_nasyncremain = 0;
+
+	/* Nothing to do if there are no valid async subplans. */
+	valid_asyncplans = node->as_valid_asyncplans;
+	if (valid_asyncplans == NULL)
+		return;
+
+	/* Make a request for each of the async subplans. */
+	i = -1;
+	while ((i = bms_next_member(valid_asyncplans, i)) >= 0)
+	{
+		AsyncRequest *areq = node->as_asyncrequests[i];
+
+		Assert(areq->request_index == i);
+		Assert(!areq->callback_pending);
+
+		/* Do the actual work. */
+		ExecAsyncRequest(areq);
+
+		++node->as_nasyncremain;
+	}
+}
+
+/* ----------------------------------------------------------------
+ *		ExecAppendAsyncGetNext
+ *
+ *		Get the next tuple from any of the asynchronous subplans.
+ * ----------------------------------------------------------------
+ */
+static bool
+ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
+{
+	*result = NULL;
+
+	/* Make new async requests. */
+	if (ExecAppendAsyncRequest(node, result))
+		return true;
+
+	while (node->as_nasyncremain > 0)
+	{
+		CHECK_FOR_INTERRUPTS();
+
+		/* Wait or poll async events. */
+		ExecAppendAsyncEventWait(node);
+
+		/* Make new async requests. */
+		if (ExecAppendAsyncRequest(node, result))
+			return true;
+
+		/* Break from loop if there is any sync node that is not complete */
+		if (!node->as_syncdone)
+			break;
+	}
+
+	/*
+	 * If all sync subplans are complete, we're totally done scanning the
+	 * givne node.  Otherwise, we're done with the asynchronous stuff but
+	 * must continue scanning the sync subplans.
+	 */
+	if (node->as_syncdone)
+	{
+		Assert(node->as_nasyncremain == 0);
+		*result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
+		return true;
+	}
+
+	return false;
+}
+
+/* ----------------------------------------------------------------
+ *		ExecAppendAsyncRequest
+ *
+ *		If there are any asynchronous subplans that need a new asynchronous
+ *		request, make all of them.
+ * ----------------------------------------------------------------
+ */
+static bool
+ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
+{
+	Bitmapset  *needrequest;
+	int			i;
+
+	/* Nothing to do if there are no async subplans needing a new request. */
+	if (bms_is_empty(node->as_needrequest))
+		return false;
+
+	/*
+	 * If there are any asynchronously-generated results that have not yet
+	 * been returned, we have nothing to do; just return one of them.
+	 */
+	if (node->as_nasyncresults > 0)
+	{
+		--node->as_nasyncresults;
+		*result = node->as_asyncresults[node->as_nasyncresults];
+		return true;
+	}
+
+	/* Make a new request for each of the async subplans that need it. */
+	needrequest = node->as_needrequest;
+	node->as_needrequest = NULL;
+	i = -1;
+	while ((i = bms_next_member(needrequest, i)) >= 0)
+	{
+		AsyncRequest *areq = node->as_asyncrequests[i];
+
+		/* Do the actual work. */
+		ExecAsyncRequest(areq);
+	}
+	bms_free(needrequest);
+
+	/* Return one of the asynchronously-generated results if any. */
+	if (node->as_nasyncresults > 0)
+	{
+		--node->as_nasyncresults;
+		*result = node->as_asyncresults[node->as_nasyncresults];
+		return true;
+	}
+
+	return false;
+}
+
+/* ----------------------------------------------------------------
+ *		ExecAppendAsyncEventWait
+ *
+ *		Wait or poll for file descriptor wait events and fire callbacks.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecAppendAsyncEventWait(AppendState *node)
+{
+	long		timeout = node->as_syncdone ? -1 : 0;
+	WaitEvent   occurred_event[EVENT_BUFFER_SIZE];
+	int			noccurred;
+	int			i;
+
+	/* Nothing to do if there are no remaining async subplans. */
+	if (node->as_nasyncremain == 0)
+		return;
+
+	node->as_eventset = CreateWaitEventSet(CurrentMemoryContext,
+										   node->as_nasyncplans + 1);
+	AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
+					  NULL, NULL);
+
+	/* Give each waiting subplan a chance to add a event. */
+	i = -1;
+	while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
+	{
+		AsyncRequest *areq = node->as_asyncrequests[i];
+
+		if (areq->callback_pending)
+			ExecAsyncConfigureWait(areq);
+	}
+
+	/* Wait for at least one event to occur. */
+	noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
+								 EVENT_BUFFER_SIZE, WAIT_EVENT_APPEND_READY);
+	FreeWaitEventSet(node->as_eventset);
+	node->as_eventset = NULL;
+	if (noccurred == 0)
+		return;
+
+	/* Deliver notifications. */
+	for (i = 0; i < noccurred; i++)
+	{
+		WaitEvent  *w = &occurred_event[i];
+
+		/*
+		 * Each waiting subplan should have registered its wait event with
+		 * user_data pointing back to its AsyncRequest.
+		 */
+		if ((w->events & WL_SOCKET_READABLE) != 0)
+		{
+			AsyncRequest *areq = (AsyncRequest *) w->user_data;
+
+			/*
+			 * Mark it as no longer needing a callback.  We must do this
+			 * before dispatching the callback in case the callback resets
+			 * the flag.
+			 */
+			Assert(areq->callback_pending);
+			areq->callback_pending = false;
+
+			/* Do the actual work. */
+			ExecAsyncNotify(areq);
+		}
+	}
+}
+
+/* ----------------------------------------------------------------
+ *		ExecAsyncAppendResponse
+ *
+ *		Receive a response from an asynchronous request we made.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncAppendResponse(AsyncRequest *areq)
+{
+	AppendState *node = (AppendState *) areq->requestor;
+	TupleTableSlot *slot = areq->result;
+
+	/* The result should be a TupleTableSlot or NULL. */
+	Assert(slot == NULL || IsA(slot, TupleTableSlot));
+
+	/* Nothing to do if the request is pending. */
+	if (!areq->request_complete)
+	{
+		/*
+		 * The subplan for which the request was made would be pending for a
+		 * callback.
+		 */
+		Assert(areq->callback_pending);
+		return;
+	}
+
+	/* If the result is NULL or an empty slot, there's nothing more to do. */
+	if (TupIsNull(slot))
+	{
+		/* The ending subplan wouldn't have been pending for a callback. */
+		Assert(!areq->callback_pending);
+		--node->as_nasyncremain;
+		return;
+	}
+
+	/* Save result so we can return it */
+	Assert(node->as_nasyncresults < node->as_nasyncplans);
+	node->as_asyncresults[node->as_nasyncresults++] = slot;
+
+	/*
+	 * Mark the subplan that returned a result as ready for a new request.  We
+	 * don't launch another one here immediately because it might complete.
+	 */
+	node->as_needrequest = bms_add_member(node->as_needrequest,
+										  areq->request_index);
+}
+
+/* ----------------------------------------------------------------
+ *		classify_matching_subplans
+ *
+ *		Classify the node's as_valid_subplans into sync ones and
+ *		async ones, adjust it to contain sync ones only, and save
+ *		async ones in the node's as_valid_asyncplans
+ * ----------------------------------------------------------------
+ */
+static void
+classify_matching_subplans(AppendState *node)
+{
+	Bitmapset  *valid_asyncplans;
+
+	/* Nothing to do if there are no valid subplans. */
+	if (bms_is_empty(node->as_valid_subplans))
+		return;
+
+	/* Nothing to do if there are no valid async subplans. */
+	if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
+		return;
+
+	/* Get valid async subplans. */
+	valid_asyncplans = bms_copy(node->as_asyncplans);
+	valid_asyncplans = bms_int_members(valid_asyncplans,
+									   node->as_valid_subplans);
+
+	/* Adjust the valid subplans to contain sync subplans only. */
+	node->as_valid_subplans = bms_del_members(node->as_valid_subplans,
+											  valid_asyncplans);
+
+	/* Save valid async subplans. */
+	node->as_valid_asyncplans = valid_asyncplans;
+}
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index 0969e53c3a..898890fb08 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -391,3 +391,51 @@ ExecShutdownForeignScan(ForeignScanState *node)
 	if (fdwroutine->ShutdownForeignScan)
 		fdwroutine->ShutdownForeignScan(node);
 }
+
+/* ----------------------------------------------------------------
+ *		ExecAsyncForeignScanRequest
+ *
+ *		Asynchronously request a tuple from a designed async-capable node
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncForeignScanRequest(AsyncRequest *areq)
+{
+	ForeignScanState *node = (ForeignScanState *) areq->requestee;
+	FdwRoutine *fdwroutine = node->fdwroutine;
+
+	Assert(fdwroutine->ForeignAsyncRequest != NULL);
+	fdwroutine->ForeignAsyncRequest(areq);
+}
+
+/* ----------------------------------------------------------------
+ *		ExecAsyncForeignScanConfigureWait
+ *
+ *		In async mode, configure for a wait
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncForeignScanConfigureWait(AsyncRequest *areq)
+{
+	ForeignScanState *node = (ForeignScanState *) areq->requestee;
+	FdwRoutine *fdwroutine = node->fdwroutine;
+
+	Assert(fdwroutine->ForeignAsyncConfigureWait != NULL);
+	fdwroutine->ForeignAsyncConfigureWait(areq);
+}
+
+/* ----------------------------------------------------------------
+ *		ExecAsyncForeignScanNotify
+ *
+ *		Callback invoked when a relevant event has occurred
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncForeignScanNotify(AsyncRequest *areq)
+{
+	ForeignScanState *node = (ForeignScanState *) areq->requestee;
+	FdwRoutine *fdwroutine = node->fdwroutine;
+
+	Assert(fdwroutine->ForeignAsyncNotify != NULL);
+	fdwroutine->ForeignAsyncNotify(areq);
+}
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index aaba1ec2c4..38aa9b5a85 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -120,6 +120,7 @@ CopyPlanFields(const Plan *from, Plan *newnode)
 	COPY_SCALAR_FIELD(plan_width);
 	COPY_SCALAR_FIELD(parallel_aware);
 	COPY_SCALAR_FIELD(parallel_safe);
+	COPY_SCALAR_FIELD(async_capable);
 	COPY_SCALAR_FIELD(plan_node_id);
 	COPY_NODE_FIELD(targetlist);
 	COPY_NODE_FIELD(qual);
@@ -241,6 +242,7 @@ _copyAppend(const Append *from)
 	 */
 	COPY_BITMAPSET_FIELD(apprelids);
 	COPY_NODE_FIELD(appendplans);
+	COPY_SCALAR_FIELD(nasyncplans);
 	COPY_SCALAR_FIELD(first_partial_plan);
 	COPY_NODE_FIELD(part_prune_info);
 
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index 8fc432bfe1..a4bffb8e88 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -333,6 +333,7 @@ _outPlanInfo(StringInfo str, const Plan *node)
 	WRITE_INT_FIELD(plan_width);
 	WRITE_BOOL_FIELD(parallel_aware);
 	WRITE_BOOL_FIELD(parallel_safe);
+	WRITE_BOOL_FIELD(async_capable);
 	WRITE_INT_FIELD(plan_node_id);
 	WRITE_NODE_FIELD(targetlist);
 	WRITE_NODE_FIELD(qual);
@@ -431,6 +432,7 @@ _outAppend(StringInfo str, const Append *node)
 
 	WRITE_BITMAPSET_FIELD(apprelids);
 	WRITE_NODE_FIELD(appendplans);
+	WRITE_INT_FIELD(nasyncplans);
 	WRITE_INT_FIELD(first_partial_plan);
 	WRITE_NODE_FIELD(part_prune_info);
 }
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index 718fb58e86..03d01eea3e 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -1614,6 +1614,7 @@ ReadCommonPlan(Plan *local_node)
 	READ_INT_FIELD(plan_width);
 	READ_BOOL_FIELD(parallel_aware);
 	READ_BOOL_FIELD(parallel_safe);
+	READ_BOOL_FIELD(async_capable);
 	READ_INT_FIELD(plan_node_id);
 	READ_NODE_FIELD(targetlist);
 	READ_NODE_FIELD(qual);
@@ -1710,6 +1711,7 @@ _readAppend(void)
 
 	READ_BITMAPSET_FIELD(apprelids);
 	READ_NODE_FIELD(appendplans);
+	READ_INT_FIELD(nasyncplans);
 	READ_INT_FIELD(first_partial_plan);
 	READ_NODE_FIELD(part_prune_info);
 
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index a25b674a19..f3100f7540 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -147,6 +147,7 @@ bool		enable_partitionwise_aggregate = false;
 bool		enable_parallel_append = true;
 bool		enable_parallel_hash = true;
 bool		enable_partition_pruning = true;
+bool		enable_async_append = true;
 
 typedef struct
 {
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 906cab7053..06774a9ec3 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -81,6 +81,7 @@ static List *get_gating_quals(PlannerInfo *root, List *quals);
 static Plan *create_gating_plan(PlannerInfo *root, Path *path, Plan *plan,
 								List *gating_quals);
 static Plan *create_join_plan(PlannerInfo *root, JoinPath *best_path);
+static bool is_async_capable_path(Path *path);
 static Plan *create_append_plan(PlannerInfo *root, AppendPath *best_path,
 								int flags);
 static Plan *create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path,
@@ -1080,6 +1081,30 @@ create_join_plan(PlannerInfo *root, JoinPath *best_path)
 	return plan;
 }
 
+/*
+ * is_async_capable_path
+ *		Check whether a given Path node is async-capable.
+ */
+static bool
+is_async_capable_path(Path *path)
+{
+	switch (nodeTag(path))
+	{
+		case T_ForeignPath:
+			{
+				FdwRoutine *fdwroutine = path->parent->fdwroutine;
+
+				Assert(fdwroutine != NULL);
+				if (fdwroutine->IsForeignPathAsyncCapable != NULL &&
+					fdwroutine->IsForeignPathAsyncCapable((ForeignPath *) path))
+					return true;
+			}
+		default:
+			break;
+	}
+	return false;
+}
+
 /*
  * create_append_plan
  *	  Create an Append plan for 'best_path' and (recursively) plans
@@ -1097,6 +1122,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
 	List	   *pathkeys = best_path->path.pathkeys;
 	List	   *subplans = NIL;
 	ListCell   *subpaths;
+	int			nasyncplans = 0;
 	RelOptInfo *rel = best_path->path.parent;
 	PartitionPruneInfo *partpruneinfo = NULL;
 	int			nodenumsortkeys = 0;
@@ -1104,6 +1130,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
 	Oid		   *nodeSortOperators = NULL;
 	Oid		   *nodeCollations = NULL;
 	bool	   *nodeNullsFirst = NULL;
+	bool		consider_async = false;
 
 	/*
 	 * The subpaths list could be empty, if every child was proven empty by
@@ -1167,6 +1194,11 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
 		tlist_was_changed = (orig_tlist_length != list_length(plan->plan.targetlist));
 	}
 
+	/* If appropriate, consider async append */
+	consider_async = (enable_async_append && pathkeys == NIL &&
+					  !best_path->path.parallel_safe &&
+					  list_length(best_path->subpaths) > 1);
+
 	/* Build the plan for each child */
 	foreach(subpaths, best_path->subpaths)
 	{
@@ -1234,6 +1266,13 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
 		}
 
 		subplans = lappend(subplans, subplan);
+
+		/* Check to see if subplan can be executed asynchronously */
+		if (consider_async && is_async_capable_path(subpath))
+		{
+			subplan->async_capable = true;
+			++nasyncplans;
+		}
 	}
 
 	/*
@@ -1266,6 +1305,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
 	}
 
 	plan->appendplans = subplans;
+	plan->nasyncplans = nasyncplans;
 	plan->first_partial_plan = best_path->first_partial_path;
 	plan->part_prune_info = partpruneinfo;
 
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index f75b52719d..58f8e0bbcf 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3999,6 +3999,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
 
 	switch (w)
 	{
+		case WAIT_EVENT_APPEND_READY:
+			event_name = "AppendReady";
+			break;
 		case WAIT_EVENT_BACKUP_WAIT_WAL_ARCHIVE:
 			event_name = "BackupWaitWalArchive";
 			break;
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index 43a5fded10..5f3318fa8f 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -2020,6 +2020,15 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 }
 #endif
 
+/*
+ * Get the number of wait events registered in a given WaitEventSet.
+ */
+int
+GetNumRegisteredWaitEvents(WaitEventSet *set)
+{
+	return set->nevents;
+}
+
 #if defined(WAIT_USE_POLL)
 
 /*
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 3fd1a5fbe2..07433aab83 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1111,6 +1111,16 @@ static struct config_bool ConfigureNamesBool[] =
 		true,
 		NULL, NULL, NULL
 	},
+	{
+		{"enable_async_append", PGC_USERSET, QUERY_TUNING_METHOD,
+			gettext_noop("Enables the planner's use of async append plans."),
+			NULL,
+			GUC_EXPLAIN
+		},
+		&enable_async_append,
+		true,
+		NULL, NULL, NULL
+	},
 	{
 		{"geqo", PGC_USERSET, QUERY_TUNING_GEQO,
 			gettext_noop("Enables genetic query optimization."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index ee06528bb0..740e4698a1 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -371,6 +371,7 @@
 #enable_partitionwise_aggregate = off
 #enable_parallel_hash = on
 #enable_partition_pruning = on
+#enable_async_append = on
 
 # - Planner Cost Constants -
 
diff --git a/src/include/executor/execAsync.h b/src/include/executor/execAsync.h
index e69de29bb2..93e8749476 100644
--- a/src/include/executor/execAsync.h
+++ b/src/include/executor/execAsync.h
@@ -0,0 +1,24 @@
+/*-------------------------------------------------------------------------
+ * execAsync.h
+ *		Support functions for asynchronous execution
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *		src/include/executor/execAsync.h
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef EXECASYNC_H
+#define EXECASYNC_H
+
+#include "nodes/execnodes.h"
+
+extern void ExecAsyncRequest(AsyncRequest *areq);
+extern void ExecAsyncConfigureWait(AsyncRequest *areq);
+extern void ExecAsyncNotify(AsyncRequest *areq);
+extern void ExecAsyncResponse(AsyncRequest *areq);
+extern void ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result);
+
+#endif   /* EXECASYNC_H */
diff --git a/src/include/executor/nodeAppend.h b/src/include/executor/nodeAppend.h
index cafd410a5d..fa54ac6ad2 100644
--- a/src/include/executor/nodeAppend.h
+++ b/src/include/executor/nodeAppend.h
@@ -25,4 +25,6 @@ extern void ExecAppendInitializeDSM(AppendState *node, ParallelContext *pcxt);
 extern void ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt);
 extern void ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt);
 
+extern void ExecAsyncAppendResponse(AsyncRequest *areq);
+
 #endif							/* NODEAPPEND_H */
diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h
index 6ae7733e25..8ffc0ca5bf 100644
--- a/src/include/executor/nodeForeignscan.h
+++ b/src/include/executor/nodeForeignscan.h
@@ -31,4 +31,8 @@ extern void ExecForeignScanInitializeWorker(ForeignScanState *node,
 											ParallelWorkerContext *pwcxt);
 extern void ExecShutdownForeignScan(ForeignScanState *node);
 
+extern void ExecAsyncForeignScanRequest(AsyncRequest *areq);
+extern void ExecAsyncForeignScanConfigureWait(AsyncRequest *areq);
+extern void ExecAsyncForeignScanNotify(AsyncRequest *areq);
+
 #endif							/* NODEFOREIGNSCAN_H */
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index 248f78da45..7c89d081c7 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -178,6 +178,14 @@ typedef List *(*ReparameterizeForeignPathByChild_function) (PlannerInfo *root,
 															List *fdw_private,
 															RelOptInfo *child_rel);
 
+typedef bool (*IsForeignPathAsyncCapable_function) (ForeignPath *path);
+
+typedef void (*ForeignAsyncRequest_function) (AsyncRequest *areq);
+
+typedef void (*ForeignAsyncConfigureWait_function) (AsyncRequest *areq);
+
+typedef void (*ForeignAsyncNotify_function) (AsyncRequest *areq);
+
 /*
  * FdwRoutine is the struct returned by a foreign-data wrapper's handler
  * function.  It provides pointers to the callback functions needed by the
@@ -256,6 +264,12 @@ typedef struct FdwRoutine
 
 	/* Support functions for path reparameterization. */
 	ReparameterizeForeignPathByChild_function ReparameterizeForeignPathByChild;
+
+	/* Support functions for asynchronous execution */
+	IsForeignPathAsyncCapable_function IsForeignPathAsyncCapable;
+	ForeignAsyncRequest_function ForeignAsyncRequest;
+	ForeignAsyncConfigureWait_function ForeignAsyncConfigureWait;
+	ForeignAsyncNotify_function ForeignAsyncNotify;
 } FdwRoutine;
 
 
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index e31ad6204e..c93b9c011e 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -515,6 +515,22 @@ typedef struct ResultRelInfo
 	struct CopyMultiInsertBuffer *ri_CopyMultiInsertBuffer;
 } ResultRelInfo;
 
+/* ----------------
+ *	  AsyncRequest
+ *
+ * State for an asynchronous tuple request.
+ * ----------------
+ */
+typedef struct AsyncRequest
+{
+	struct PlanState *requestor;	/* Node that wants a tuple */
+	struct PlanState *requestee;	/* Node from which a tuple is wanted */
+	int			request_index;	/* Scratch space for requestor */
+	bool		callback_pending;	/* Callback is needed */
+	bool		request_complete;	/* Request complete, result valid */
+	TupleTableSlot *result;		/* Result (NULL if no more tuples) */
+} AsyncRequest;
+
 /* ----------------
  *	  EState information
  *
@@ -1220,12 +1236,23 @@ struct AppendState
 	PlanState **appendplans;	/* array of PlanStates for my inputs */
 	int			as_nplans;
 	int			as_whichplan;
+	bool		as_syncdone;	/* all synchronous plans done? */
+	Bitmapset  *as_asyncplans;	/* asynchronous plans indexes */
+	int			as_nasyncplans;	/* # of asynchronous plans */
+	AsyncRequest **as_asyncrequests;	/* array of AsyncRequests */
+	TupleTableSlot **as_asyncresults;	/* unreturned results of async plans */
+	int			as_nasyncresults;	/* # of valid entries in as_asyncresults */
+	int			as_nasyncremain;	/* # of remaining async plans */
+	Bitmapset  *as_needrequest;	/* async plans ready for a request */
+	struct WaitEventSet *as_eventset;	/* WaitEventSet used to configure
+										 * file descriptor wait events */
 	int			as_first_partial_plan;	/* Index of 'appendplans' containing
 										 * the first partial plan */
 	ParallelAppendState *as_pstate; /* parallel coordination info */
 	Size		pstate_len;		/* size of parallel coordination info */
 	struct PartitionPruneState *as_prune_state;
 	Bitmapset  *as_valid_subplans;
+	Bitmapset  *as_valid_asyncplans;
 	bool		(*choose_next_subplan) (AppendState *);
 };
 
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index 6e62104d0b..24ca616740 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -129,6 +129,11 @@ typedef struct Plan
 	bool		parallel_aware; /* engage parallel-aware logic? */
 	bool		parallel_safe;	/* OK to use as part of parallel plan? */
 
+	/*
+	 * information needed for asynchronous execution
+	 */
+	bool		async_capable; 	/* engage asynchronous-capable logic? */
+
 	/*
 	 * Common structural data for all Plan types.
 	 */
@@ -245,6 +250,7 @@ typedef struct Append
 	Plan		plan;
 	Bitmapset  *apprelids;		/* RTIs of appendrel(s) formed by this node */
 	List	   *appendplans;
+	int			nasyncplans;	/* # of asynchronous plans */
 
 	/*
 	 * All 'appendplans' preceding this index are non-partial plans. All
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index 1be93be098..a3fd93fe07 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -65,6 +65,7 @@ extern PGDLLIMPORT bool enable_partitionwise_aggregate;
 extern PGDLLIMPORT bool enable_parallel_append;
 extern PGDLLIMPORT bool enable_parallel_hash;
 extern PGDLLIMPORT bool enable_partition_pruning;
+extern PGDLLIMPORT bool enable_async_append;
 extern PGDLLIMPORT int constraint_exclusion;
 
 extern double index_pages_fetched(double tuples_fetched, BlockNumber pages,
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 724068cf87..d9588da38a 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -957,6 +957,7 @@ typedef enum
  */
 typedef enum
 {
+	WAIT_EVENT_APPEND_READY,
 	WAIT_EVENT_BACKUP_WAIT_WAL_ARCHIVE = PG_WAIT_IPC,
 	WAIT_EVENT_BGWORKER_SHUTDOWN,
 	WAIT_EVENT_BGWORKER_STARTUP,
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index 9e94fcaec2..44f9368c64 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -179,5 +179,6 @@ extern int	WaitLatch(Latch *latch, int wakeEvents, long timeout,
 extern int	WaitLatchOrSocket(Latch *latch, int wakeEvents,
 							  pgsocket sock, long timeout, uint32 wait_event_info);
 extern void InitializeLatchWaitSet(void);
+extern int	GetNumRegisteredWaitEvents(WaitEventSet *set);
 
 #endif							/* LATCH_H */
diff --git a/src/test/regress/expected/explain.out b/src/test/regress/expected/explain.out
index dc7ab2ce8b..e78ca7bddb 100644
--- a/src/test/regress/expected/explain.out
+++ b/src/test/regress/expected/explain.out
@@ -87,6 +87,7 @@ select explain_filter('explain (analyze, buffers, format json) select * from int
      "Plan": {                     +
        "Node Type": "Seq Scan",    +
        "Parallel Aware": false,    +
+       "Async Capable": false,     +
        "Relation Name": "int8_tbl",+
        "Alias": "i8",              +
        "Startup Cost": N.N,        +
@@ -136,6 +137,7 @@ select explain_filter('explain (analyze, buffers, format xml) select * from int8
      <Plan>                                            +
        <Node-Type>Seq Scan</Node-Type>                 +
        <Parallel-Aware>false</Parallel-Aware>          +
+       <Async-Capable>false</Async-Capable>            +
        <Relation-Name>int8_tbl</Relation-Name>         +
        <Alias>i8</Alias>                               +
        <Startup-Cost>N.N</Startup-Cost>                +
@@ -183,6 +185,7 @@ select explain_filter('explain (analyze, buffers, format yaml) select * from int
  - Plan:                      +
      Node Type: "Seq Scan"    +
      Parallel Aware: false    +
+     Async Capable: false     +
      Relation Name: "int8_tbl"+
      Alias: "i8"              +
      Startup Cost: N.N        +
@@ -233,6 +236,7 @@ select explain_filter('explain (buffers, format json) select * from int8_tbl i8'
      "Plan": {                     +
        "Node Type": "Seq Scan",    +
        "Parallel Aware": false,    +
+       "Async Capable": false,     +
        "Relation Name": "int8_tbl",+
        "Alias": "i8",              +
        "Startup Cost": N.N,        +
@@ -348,6 +352,7 @@ select jsonb_pretty(
                              "Actual Rows": 0,              +
                              "Actual Loops": 0,             +
                              "Startup Cost": 0.0,           +
+                             "Async Capable": false,        +
                              "Relation Name": "tenk1",      +
                              "Parallel Aware": true,        +
                              "Local Hit Blocks": 0,         +
@@ -393,6 +398,7 @@ select jsonb_pretty(
                      "Actual Rows": 0,                      +
                      "Actual Loops": 0,                     +
                      "Startup Cost": 0.0,                   +
+                     "Async Capable": false,                +
                      "Parallel Aware": false,               +
                      "Sort Space Used": 0,                  +
                      "Local Hit Blocks": 0,                 +
@@ -435,6 +441,7 @@ select jsonb_pretty(
              "Actual Rows": 0,                              +
              "Actual Loops": 0,                             +
              "Startup Cost": 0.0,                           +
+             "Async Capable": false,                        +
              "Parallel Aware": false,                       +
              "Workers Planned": 0,                          +
              "Local Hit Blocks": 0,                         +
diff --git a/src/test/regress/expected/incremental_sort.out b/src/test/regress/expected/incremental_sort.out
index 68ca321163..a417b566d9 100644
--- a/src/test/regress/expected/incremental_sort.out
+++ b/src/test/regress/expected/incremental_sort.out
@@ -558,6 +558,7 @@ select jsonb_pretty(explain_analyze_inc_sort_nodes_without_memory('select * from
          "Node Type": "Incremental Sort",       +
          "Actual Rows": 55,                     +
          "Actual Loops": 1,                     +
+         "Async Capable": false,                +
          "Presorted Key": [                     +
              "t.a"                              +
          ],                                     +
@@ -760,6 +761,7 @@ select jsonb_pretty(explain_analyze_inc_sort_nodes_without_memory('select * from
          "Node Type": "Incremental Sort",       +
          "Actual Rows": 70,                     +
          "Actual Loops": 1,                     +
+         "Async Capable": false,                +
          "Presorted Key": [                     +
              "t.a"                              +
          ],                                     +
diff --git a/src/test/regress/expected/insert_conflict.out b/src/test/regress/expected/insert_conflict.out
index ff157ceb1c..499245068a 100644
--- a/src/test/regress/expected/insert_conflict.out
+++ b/src/test/regress/expected/insert_conflict.out
@@ -204,6 +204,7 @@ explain (costs off, format json) insert into insertconflicttest values (0, 'Bilb
        "Node Type": "ModifyTable",                                     +
        "Operation": "Insert",                                          +
        "Parallel Aware": false,                                        +
+       "Async Capable": false,                                         +
        "Relation Name": "insertconflicttest",                          +
        "Alias": "insertconflicttest",                                  +
        "Conflict Resolution": "UPDATE",                                +
@@ -213,7 +214,8 @@ explain (costs off, format json) insert into insertconflicttest values (0, 'Bilb
          {                                                             +
            "Node Type": "Result",                                      +
            "Parent Relationship": "Member",                            +
-           "Parallel Aware": false                                     +
+           "Parallel Aware": false,                                    +
+           "Async Capable": false                                      +
          }                                                             +
        ]                                                               +
      }                                                                 +
diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out
index 6d048e309c..98dde452e6 100644
--- a/src/test/regress/expected/sysviews.out
+++ b/src/test/regress/expected/sysviews.out
@@ -95,6 +95,7 @@ select count(*) = 0 as ok from pg_stat_wal_receiver;
 select name, setting from pg_settings where name like 'enable%';
               name              | setting 
 --------------------------------+---------
+ enable_async_append            | on
  enable_bitmapscan              | on
  enable_gathermerge             | on
  enable_hashagg                 | on
@@ -113,7 +114,7 @@ select name, setting from pg_settings where name like 'enable%';
  enable_seqscan                 | on
  enable_sort                    | on
  enable_tidscan                 | on
-(18 rows)
+(19 rows)
 
 -- Test that the pg_timezone_names and pg_timezone_abbrevs views are
 -- more-or-less working.  We can't test their contents in any great detail
