diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index f0c45b00db..864f9516ad 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -59,6 +59,7 @@ typedef struct ConnCacheEntry
 	bool		have_error;		/* have any subxacts aborted in this xact? */
 	bool		changing_xact_state;	/* xact state change in process */
 	bool		parallel_commit;	/* do we commit (sub)xacts in parallel? */
+	bool		parallel_abort;	/* do we abort (sub)xacts in parallel? */
 	bool		invalidated;	/* true if reconnect is pending */
 	bool		keep_connections;	/* setting value of keep_connections
 									 * server option */
@@ -80,6 +81,25 @@ static unsigned int prep_stmt_number = 0;
 /* tracks whether any work is needed in callback functions */
 static bool xact_got_connection = false;
 
+/*
+ * Milliseconds to wait to cancel an in-progress query or execute a cleanup
+ * query; if it takes longer than 30 seconds to do these, we assume the
+ * connection is dead.
+ */
+#define CONNECTION_CLEANUP_TIMEOUT	30000
+
+/* Macro for constructing abort command to be sent */
+#define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel) \
+	do { \
+		if (toplevel) \
+			snprintf((sql), sizeof(sql), \
+					 "ABORT TRANSACTION"); \
+		else \
+			snprintf((sql), sizeof(sql), \
+					 "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", \
+					 (entry)->xact_depth, (entry)->xact_depth); \
+	} while(0)
+
 /*
  * SQL functions
  */
@@ -106,14 +126,28 @@ static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
 static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
 static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
 static bool pgfdw_cancel_query(PGconn *conn);
+static bool pgfdw_cancel_query_begin(PGconn *conn);
+static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
+								   bool consume_input);
 static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
 									 bool ignore_errors);
+static bool pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query);
+static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
+										 TimestampTz endtime,
+										 bool consume_input,
+										 bool ignore_errors);
 static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
 									 PGresult **result, bool *timed_out);
 static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
+static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
+									  List **pending_entries,
+									  List **cancel_requested);
 static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
 static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
 											   int curlevel);
+static void pgfdw_finish_abort_cleanup(List *pending_entries,
+									   List *cancel_requested,
+									   bool toplevel);
 static bool UserMappingPasswordRequired(UserMapping *user);
 static bool disconnect_cached_connections(Oid serverid);
 
@@ -320,8 +354,8 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user)
 	 *
 	 * By default, all the connections to any foreign servers are kept open.
 	 *
-	 * Also determine whether to commit (sub)transactions opened on the remote
-	 * server in parallel at (sub)transaction end, which is disabled by
+	 * Also determine whether to commit/abort (sub)transactions opened on the
+	 * remote server in parallel at (sub)transaction end, which is disabled by
 	 * default.
 	 *
 	 * Note: it's enough to determine these only when making a new connection
@@ -330,6 +364,7 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user)
 	 */
 	entry->keep_connections = true;
 	entry->parallel_commit = false;
+	entry->parallel_abort = false;
 	foreach(lc, server->options)
 	{
 		DefElem    *def = (DefElem *) lfirst(lc);
@@ -338,6 +373,8 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user)
 			entry->keep_connections = defGetBoolean(def);
 		else if (strcmp(def->defname, "parallel_commit") == 0)
 			entry->parallel_commit = defGetBoolean(def);
+		else if (strcmp(def->defname, "parallel_abort") == 0)
+			entry->parallel_abort = defGetBoolean(def);
 	}
 
 	/* Now try to make the connection */
@@ -923,6 +960,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 	HASH_SEQ_STATUS scan;
 	ConnCacheEntry *entry;
 	List	   *pending_entries = NIL;
+	List	   *cancel_requested = NIL;
 
 	/* Quick exit if no connections were touched in this transaction. */
 	if (!xact_got_connection)
@@ -1016,7 +1054,15 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 				case XACT_EVENT_PARALLEL_ABORT:
 				case XACT_EVENT_ABORT:
 					/* Rollback all remote transactions during abort */
-					pgfdw_abort_cleanup(entry, true);
+					if (entry->parallel_abort)
+					{
+						if (pgfdw_abort_cleanup_begin(entry, true,
+													  &pending_entries,
+													  &cancel_requested))
+							continue;
+					}
+					else
+						pgfdw_abort_cleanup(entry, true);
 					break;
 			}
 		}
@@ -1026,11 +1072,21 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 	}
 
 	/* If there are any pending connections, finish cleaning them up */
-	if (pending_entries)
+	if (pending_entries || cancel_requested)
 	{
-		Assert(event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
-			   event == XACT_EVENT_PRE_COMMIT);
-		pgfdw_finish_pre_commit_cleanup(pending_entries);
+		if (event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
+			event == XACT_EVENT_PRE_COMMIT)
+		{
+			Assert(cancel_requested == NIL);
+			pgfdw_finish_pre_commit_cleanup(pending_entries);
+		}
+		else
+		{
+			Assert(event == XACT_EVENT_PARALLEL_ABORT ||
+				   event == XACT_EVENT_ABORT);
+			pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
+									   true);
+		}
 	}
 
 	/*
@@ -1055,6 +1111,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
 	ConnCacheEntry *entry;
 	int			curlevel;
 	List	   *pending_entries = NIL;
+	List	   *cancel_requested = NIL;
 
 	/* Nothing to do at subxact start, nor after commit. */
 	if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
@@ -1109,7 +1166,15 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
 		else
 		{
 			/* Rollback all remote subtransactions during abort */
-			pgfdw_abort_cleanup(entry, false);
+			if (entry->parallel_abort)
+			{
+				if (pgfdw_abort_cleanup_begin(entry, false,
+											  &pending_entries,
+											  &cancel_requested))
+					continue;
+			}
+			else
+				pgfdw_abort_cleanup(entry, false);
 		}
 
 		/* OK, we're outta that level of subtransaction */
@@ -1117,10 +1182,19 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
 	}
 
 	/* If there are any pending connections, finish cleaning them up */
-	if (pending_entries)
+	if (pending_entries || cancel_requested)
 	{
-		Assert(event == SUBXACT_EVENT_PRE_COMMIT_SUB);
-		pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
+		if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
+		{
+			Assert(cancel_requested == NIL);
+			pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
+		}
+		else
+		{
+			Assert(event == SUBXACT_EVENT_ABORT_SUB);
+			pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
+									   false);
+		}
 	}
 }
 
@@ -1264,17 +1338,25 @@ pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
 static bool
 pgfdw_cancel_query(PGconn *conn)
 {
-	PGcancel   *cancel;
-	char		errbuf[256];
-	PGresult   *result = NULL;
 	TimestampTz endtime;
-	bool		timed_out;
 
 	/*
 	 * If it takes too long to cancel the query and discard the result, assume
 	 * the connection is dead.
 	 */
-	endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000);
+	endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+										  CONNECTION_CLEANUP_TIMEOUT);
+
+	if (!pgfdw_cancel_query_begin(conn))
+		return false;
+	return pgfdw_cancel_query_end(conn, endtime, false);
+}
+
+static bool
+pgfdw_cancel_query_begin(PGconn *conn)
+{
+	PGcancel   *cancel;
+	char		errbuf[256];
 
 	/*
 	 * Issue cancel request.  Unfortunately, there's no good way to limit the
@@ -1294,6 +1376,31 @@ pgfdw_cancel_query(PGconn *conn)
 		PQfreeCancel(cancel);
 	}
 
+	return true;
+}
+
+static bool
+pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime, bool consume_input)
+{
+	PGresult   *result = NULL;
+	bool		timed_out;
+
+	/*
+	 * If requested, consume whatever data is available from the socket.
+	 * (Note that if all data is available, this allows
+	 * pgfdw_get_cleanup_result to call PQgetResult without forcing the
+	 * overhead of WaitLatchOrSocket, which would be large compared to the
+	 * overhead of PQconsumeInput.)
+	 */
+	if (consume_input && !PQconsumeInput(conn))
+	{
+		ereport(WARNING,
+				(errcode(ERRCODE_CONNECTION_FAILURE),
+				 errmsg("could not get result of cancel request: %s",
+						pchomp(PQerrorMessage(conn)))));
+		return false;
+	}
+
 	/* Get and discard the result of the query. */
 	if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
 	{
@@ -1328,9 +1435,7 @@ pgfdw_cancel_query(PGconn *conn)
 static bool
 pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
 {
-	PGresult   *result = NULL;
 	TimestampTz endtime;
-	bool		timed_out;
 
 	/*
 	 * If it takes too long to execute a cleanup query, assume the connection
@@ -1338,8 +1443,18 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
 	 * place (e.g. statement timeout, user cancel), so the timeout shouldn't
 	 * be too long.
 	 */
-	endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000);
+	endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+										  CONNECTION_CLEANUP_TIMEOUT);
+
+	if (!pgfdw_exec_cleanup_query_begin(conn, query))
+		return false;
+	return pgfdw_exec_cleanup_query_end(conn, query, endtime,
+										false, ignore_errors);
+}
 
+static bool
+pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query)
+{
 	/*
 	 * Submit a query.  Since we don't use non-blocking mode, this also can
 	 * block.  But its risk is relatively small, so we ignore that for now.
@@ -1350,6 +1465,30 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
 		return false;
 	}
 
+	return true;
+}
+
+static bool
+pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
+							 TimestampTz endtime, bool consume_input,
+							 bool ignore_errors)
+{
+	PGresult   *result = NULL;
+	bool		timed_out;
+
+	/*
+	 * If requested, consume whatever data is available from the socket.
+	 * (Note that if all data is available, this allows
+	 * pgfdw_get_cleanup_result to call PQgetResult without forcing the
+	 * overhead of WaitLatchOrSocket, which would be large compared to the
+	 * overhead of PQconsumeInput.)
+	 */
+	if (consume_input && !PQconsumeInput(conn))
+	{
+		pgfdw_report_error(WARNING, NULL, conn, false, query);
+		return false;
+	}
+
 	/* Get the result of the query. */
 	if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
 	{
@@ -1505,12 +1644,7 @@ pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
 		!pgfdw_cancel_query(entry->conn))
 		return;					/* Unable to cancel running query */
 
-	if (toplevel)
-		snprintf(sql, sizeof(sql), "ABORT TRANSACTION");
-	else
-		snprintf(sql, sizeof(sql),
-				 "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
-				 entry->xact_depth, entry->xact_depth);
+	CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
 	if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
 		return;					/* Unable to abort remote (sub)transaction */
 
@@ -1539,6 +1673,65 @@ pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
 	entry->changing_xact_state = false;
 }
 
+/*
+ * Like pgfdw_abort_cleanup, submit an abort command or cancel a request, but
+ * don't wait for the result.
+ *
+ * Returns true if the abort command or cancel request is successfully issued,
+ * false otherwise.  If the abort command is successfully issued, the given
+ * connection cache entry is appended to *pending_entries.  Othewise, if the
+ * cancel request is successfully issued, it's appended to *cancel_requested.
+ */
+static bool
+pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
+						  List **pending_entries, List **cancel_requested)
+{
+	/*
+	 * Don't try to clean up the connection if we're already in error
+	 * recursion trouble.
+	 */
+	if (in_error_recursion_trouble())
+		entry->changing_xact_state = true;
+
+	/*
+	 * If connection is already unsalvageable, don't touch it further.
+	 */
+	if (entry->changing_xact_state)
+		return false;
+
+	/*
+	 * Mark this connection as in the process of changing transaction state.
+	 */
+	entry->changing_xact_state = true;
+
+	/* Assume we might have lost track of prepared statements */
+	entry->have_error = true;
+
+	/*
+	 * If a command has been submitted to the remote server by using an
+	 * asynchronous execution function, the command might not have yet
+	 * completed.  Check to see if a command is still being processed by the
+	 * remote server, and if so, request cancellation of the command.
+	 */
+	if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
+	{
+		if (!pgfdw_cancel_query_begin(entry->conn))
+			return false;		/* Unable to cancel running query */
+		*cancel_requested = lappend(*cancel_requested, entry);
+	}
+	else
+	{
+		char		sql[100];
+
+		CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
+		if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
+			return false;		/* Unable to abort remote transaction */
+		*pending_entries = lappend(*pending_entries, entry);
+	}
+
+	return true;
+}
+
 /*
  * Finish pre-commit cleanup of connections on each of which we've sent a
  * COMMIT command to the remote server.
@@ -1647,6 +1840,168 @@ pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
 	}
 }
 
+/*
+ * Finish abort cleanup of connections on each of which we've sent an abort
+ * command or cancel request to the remote server.
+ */
+static void
+pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
+						   bool toplevel)
+{
+	List	   *pending_deallocs = NIL;
+	ListCell   *lc;
+
+	/*
+	 * For each of the pending cancel requests (if any), get and discard the
+	 * result of the query, and submit an abort command to the remote server.
+	 */
+	if (cancel_requested)
+	{
+		foreach(lc, cancel_requested)
+		{
+			ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
+			TimestampTz endtime;
+			char		sql[100];
+
+			Assert(entry->changing_xact_state);
+
+			/*
+			 * Set end time.  You might think we should do this before issuing
+			 * cancel request like in normal mode, but that is problematic,
+			 * because if, for example, it took longer than 30 seconds to
+			 * process the first few entries in the cancel_requested list, it
+			 * would cause a timeout error when processing each of the
+			 * remaining entries in the list, leading to slamming that entry's
+			 * connection shut.
+			 */
+			endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+												  CONNECTION_CLEANUP_TIMEOUT);
+
+			if (!pgfdw_cancel_query_end(entry->conn, endtime, true))
+			{
+				/* Unable to cancel running query */
+				pgfdw_reset_xact_state(entry, toplevel);
+				continue;
+			}
+
+			/* Send an abort command in parallel if needed */
+			CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
+			if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
+			{
+				/* Unable to abort remote (sub)transaction */
+				pgfdw_reset_xact_state(entry, toplevel);
+			}
+			else
+				pending_entries = lappend(pending_entries, entry);
+		}
+	}
+
+	/* No further work if no pending entries */
+	if (!pending_entries)
+		return;
+
+	/*
+	 * Get the result of the abort command for each of the pending entries
+	 */
+	foreach(lc, pending_entries)
+	{
+		ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
+		TimestampTz endtime;
+		char		sql[100];
+
+		Assert(entry->changing_xact_state);
+
+		/*
+		 * Set end time.  We do this now, not before issuing the command like
+		 * in normal mode, for the same reason as for the cancel_requested
+		 * entries.
+		 */
+		endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+											  CONNECTION_CLEANUP_TIMEOUT);
+
+		CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
+		if (!pgfdw_exec_cleanup_query_end(entry->conn, sql, endtime,
+										  true, false))
+		{
+			/* Unable to abort remote (sub)transaction */
+			pgfdw_reset_xact_state(entry, toplevel);
+			continue;
+		}
+
+		if (toplevel)
+		{
+			/* Do a DEALLOCATE ALL in parallel if needed */
+			if (entry->have_prep_stmt && entry->have_error)
+			{
+				if (!pgfdw_exec_cleanup_query_begin(entry->conn,
+													"DEALLOCATE ALL"))
+				{
+					/* Trouble clearing prepared statements */
+					pgfdw_reset_xact_state(entry, toplevel);
+				}
+				else
+					pending_deallocs = lappend(pending_deallocs, entry);
+				continue;
+			}
+			entry->have_prep_stmt = false;
+			entry->have_error = false;
+		}
+
+		/* Reset the per-connection state if needed */
+		if (entry->state.pendingAreq)
+			memset(&entry->state, 0, sizeof(entry->state));
+
+		/* We're done with this entry; unset the changing_xact_state flag */
+		entry->changing_xact_state = false;
+		pgfdw_reset_xact_state(entry, toplevel);
+	}
+
+	/* No further work if no pending entries */
+	if (!pending_deallocs)
+		return;
+	Assert(toplevel);
+
+	/*
+	 * Get the result of the DEALLOCATE command for each of the pending
+	 * entries
+	 */
+	foreach(lc, pending_deallocs)
+	{
+		ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
+		TimestampTz endtime;
+
+		Assert(entry->changing_xact_state);
+		Assert(entry->have_prep_stmt);
+		Assert(entry->have_error);
+
+		/*
+		 * Set end time.  We do this now, not before issuing the command like
+		 * in normal mode, for the same reason as for the cancel_requested
+		 * entries.
+		 */
+		endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+											  CONNECTION_CLEANUP_TIMEOUT);
+
+		if (!pgfdw_exec_cleanup_query_end(entry->conn, "DEALLOCATE ALL",
+										  endtime, true, true))
+		{
+			/* Trouble clearing prepared statements */
+			pgfdw_reset_xact_state(entry, toplevel);
+			continue;
+		}
+		entry->have_prep_stmt = false;
+		entry->have_error = false;
+
+		/* Reset the per-connection state if needed */
+		if (entry->state.pendingAreq)
+			memset(&entry->state, 0, sizeof(entry->state));
+
+		/* We're done with this entry; unset the changing_xact_state flag */
+		entry->changing_xact_state = false;
+		pgfdw_reset_xact_state(entry, toplevel);
+	}
+}
+
 /*
  * List active foreign server connections.
  *
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 558e94b845..5ddb483cd5 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -11501,10 +11501,12 @@ SELECT pg_terminate_backend(pid, 180000) FROM pg_stat_activity
 RESET postgres_fdw.application_name;
 RESET debug_discard_caches;
 -- ===================================================================
--- test parallel commit
+-- test parallel commit and parallel abort
 -- ===================================================================
 ALTER SERVER loopback OPTIONS (ADD parallel_commit 'true');
+ALTER SERVER loopback OPTIONS (ADD parallel_abort 'true');
 ALTER SERVER loopback2 OPTIONS (ADD parallel_commit 'true');
+ALTER SERVER loopback2 OPTIONS (ADD parallel_abort 'true');
 CREATE TABLE ploc1 (f1 int, f2 text);
 CREATE FOREIGN TABLE prem1 (f1 int, f2 text)
   SERVER loopback OPTIONS (table_name 'ploc1');
@@ -11574,5 +11576,52 @@ SELECT * FROM prem2;
  204 | quxqux
 (3 rows)
 
+BEGIN;
+INSERT INTO prem1 VALUES (105, 'test1');
+INSERT INTO prem2 VALUES (205, 'test2');
+ABORT;
+SELECT * FROM prem1;
+ f1  |   f2   
+-----+--------
+ 101 | foo
+ 102 | foofoo
+ 104 | bazbaz
+(3 rows)
+
+SELECT * FROM prem2;
+ f1  |   f2   
+-----+--------
+ 201 | bar
+ 202 | barbar
+ 204 | quxqux
+(3 rows)
+
+BEGIN;
+SAVEPOINT s;
+INSERT INTO prem1 VALUES (105, 'test1');
+INSERT INTO prem2 VALUES (205, 'test2');
+ROLLBACK TO SAVEPOINT s;
+RELEASE SAVEPOINT s;
+INSERT INTO prem1 VALUES (105, 'test1');
+INSERT INTO prem2 VALUES (205, 'test2');
+ABORT;
+SELECT * FROM prem1;
+ f1  |   f2   
+-----+--------
+ 101 | foo
+ 102 | foofoo
+ 104 | bazbaz
+(3 rows)
+
+SELECT * FROM prem2;
+ f1  |   f2   
+-----+--------
+ 201 | bar
+ 202 | barbar
+ 204 | quxqux
+(3 rows)
+
 ALTER SERVER loopback OPTIONS (DROP parallel_commit);
+ALTER SERVER loopback OPTIONS (DROP parallel_abort);
 ALTER SERVER loopback2 OPTIONS (DROP parallel_commit);
+ALTER SERVER loopback2 OPTIONS (DROP parallel_abort);
diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c
index fa80ee2a55..df6680fe27 100644
--- a/contrib/postgres_fdw/option.c
+++ b/contrib/postgres_fdw/option.c
@@ -125,6 +125,7 @@ postgres_fdw_validator(PG_FUNCTION_ARGS)
 			strcmp(def->defname, "truncatable") == 0 ||
 			strcmp(def->defname, "async_capable") == 0 ||
 			strcmp(def->defname, "parallel_commit") == 0 ||
+			strcmp(def->defname, "parallel_abort") == 0 ||
 			strcmp(def->defname, "keep_connections") == 0)
 		{
 			/* these accept only boolean values */
@@ -254,6 +255,7 @@ InitPgFdwOptions(void)
 		{"async_capable", ForeignServerRelationId, false},
 		{"async_capable", ForeignTableRelationId, false},
 		{"parallel_commit", ForeignServerRelationId, false},
+		{"parallel_abort", ForeignServerRelationId, false},
 		{"keep_connections", ForeignServerRelationId, false},
 		{"password_required", UserMappingRelationId, false},
 
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index b0dbb41fb5..4d5140669b 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -3744,10 +3744,12 @@ RESET postgres_fdw.application_name;
 RESET debug_discard_caches;
 
 -- ===================================================================
--- test parallel commit
+-- test parallel commit and parallel abort
 -- ===================================================================
 ALTER SERVER loopback OPTIONS (ADD parallel_commit 'true');
+ALTER SERVER loopback OPTIONS (ADD parallel_abort 'true');
 ALTER SERVER loopback2 OPTIONS (ADD parallel_commit 'true');
+ALTER SERVER loopback2 OPTIONS (ADD parallel_abort 'true');
 
 CREATE TABLE ploc1 (f1 int, f2 text);
 CREATE FOREIGN TABLE prem1 (f1 int, f2 text)
@@ -3786,5 +3788,26 @@ COMMIT;
 SELECT * FROM prem1;
 SELECT * FROM prem2;
 
+BEGIN;
+INSERT INTO prem1 VALUES (105, 'test1');
+INSERT INTO prem2 VALUES (205, 'test2');
+ABORT;
+SELECT * FROM prem1;
+SELECT * FROM prem2;
+
+BEGIN;
+SAVEPOINT s;
+INSERT INTO prem1 VALUES (105, 'test1');
+INSERT INTO prem2 VALUES (205, 'test2');
+ROLLBACK TO SAVEPOINT s;
+RELEASE SAVEPOINT s;
+INSERT INTO prem1 VALUES (105, 'test1');
+INSERT INTO prem2 VALUES (205, 'test2');
+ABORT;
+SELECT * FROM prem1;
+SELECT * FROM prem2;
+
 ALTER SERVER loopback OPTIONS (DROP parallel_commit);
+ALTER SERVER loopback OPTIONS (DROP parallel_abort);
 ALTER SERVER loopback2 OPTIONS (DROP parallel_commit);
+ALTER SERVER loopback2 OPTIONS (DROP parallel_abort);
diff --git a/doc/src/sgml/postgres-fdw.sgml b/doc/src/sgml/postgres-fdw.sgml
index 527f4deaaa..2b309ad002 100644
--- a/doc/src/sgml/postgres-fdw.sgml
+++ b/doc/src/sgml/postgres-fdw.sgml
@@ -469,12 +469,13 @@ OPTIONS (ADD password_required 'false');
     corresponding remote transactions, and subtransactions are managed by
     creating corresponding remote subtransactions. When multiple remote
     transactions are involved in the current local transaction, by default
-    <filename>postgres_fdw</filename> commits those remote transactions
-    serially when the local transaction is committed. When multiple remote
-    subtransactions are involved in the current local subtransaction, by
-    default <filename>postgres_fdw</filename> commits those remote
-    subtransactions serially when the local subtransaction is committed.
-    Performance can be improved with the following option:
+    <filename>postgres_fdw</filename> commits or aborts those remote
+    transactions serially when the local transaction is committed or aborted.
+    When multiple remote subtransactions are involved in the current local
+    subtransaction, by default <filename>postgres_fdw</filename> commits or
+    aborts those remote subtransactions serially when the local subtransaction
+    is committed or abortd.
+    Performance can be improved with the following options:
    </para>
 
    <variablelist>
@@ -490,24 +491,38 @@ OPTIONS (ADD password_required 'false');
        specified for foreign servers, not per-table. The default is
        <literal>false</literal>.
       </para>
+     </listitem>
+    </varlistentry>
 
+    <varlistentry>
+     <term><literal>parallel_abort</literal> (<type>boolean</type>)</term>
+     <listitem>
       <para>
-       If multiple foreign servers with this option enabled are involved in a
-       local transaction, multiple remote transactions on those foreign
-       servers are committed in parallel across those foreign servers when
-       the local transaction is committed.
-      </para>
-
-      <para>
-       When this option is enabled, a foreign server with many remote
-       transactions may see a negative performance impact when the local
-       transaction is committed.
+       This option controls whether <filename>postgres_fdw</filename> aborts
+       in parallel remote transactions opened on a foreign server in a local
+       transaction when the local transaction is aborted. This setting also
+       applies to remote and local subtransactions. This option can only be
+       specified for foreign servers, not per-table. The default is
+       <literal>false</literal>.
       </para>
      </listitem>
     </varlistentry>
 
    </variablelist>
 
+   <para>
+    If multiple foreign servers with these options enabled are involved in a
+    local transaction, multiple remote transactions on those foreign servers
+    are committed or aborted in parallel across those foreign servers when
+    the local transaction is committed or aborted.
+   </para>
+
+   <para>
+    When these options are enabled, a foreign server with many remote
+    transactions may see a negative performance impact when the local
+    transaction is committed or aborted.
+   </para>
+
   </sect3>
 
   <sect3>
