Hi,

I'm posting a v4-0001 patch for the new functions
postgres_fdw_get_connections() and postgres_fdw_disconnect().  In this
patch, I tried to address the review comments provided upthread.

At a high level, the changes include:
1) Storing the foreign server id in the cache entry which will help to
fetch the server name associated with it easily.
2) postgres_fdw_get_connections now returns an open connection server
name and true or false to indicate whether it's valid or not.
3) postgres_fdw_get_connections can issue a warning when the cache
look up for server name returns null i.e. the foreign server is
dropped. Please see the comments before postgres_fdw_get_connections
in which situations this is possible.
4) postgres_fdw_disconnect('myserver') disconnects the open connection
only when it's not being used in the current xact. If it's used, then
false is returned and a warning is issued.
5) postgres_fdw_disconnect() disconnects all the connections only when
they are not being used in the current xact. If at least one
connection that's being used exists, then it issues a warning and
returns true if at least one open connection gets closed otherwise
false. If there are no connections made yet or connection cache is
empty, then also false is returned.
6) postgres_fdw_disconnect can discard the entire cache if there is no
active connection.

Thoughts?

Below things are still pending which I plan to post new patches after
the v4-0001 is reviewed:
1) changing the version of postgres_fdw--1.0.sql to postgres_fdw--1.1.sql
2) 0002 and 0003 patches having keep_connections GUC and
keep_connection server level option.

With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
From 82b82b901e8f0ca84e1b21454a3b68f4fd8f0016 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddy@enterprisedb.com>
Date: Wed, 30 Dec 2020 11:07:35 +0530
Subject: [PATCH v4] postgres_fdw function to discard cached connections

This patch introduces new function postgres_fdw_disconnect() when
called with a foreign server name discards the associated
connections with the server name. When called without any argument,
discards all the existing cached connections.

This patch also adds another function postgres_fdw_get_connections()
to get the list of all cached connections by corresponding foreign
server names.
---
 contrib/postgres_fdw/connection.c             | 319 +++++++++++++-
 .../postgres_fdw/expected/postgres_fdw.out    | 415 +++++++++++++++++-
 contrib/postgres_fdw/postgres_fdw--1.0.sql    |  15 +
 contrib/postgres_fdw/sql/postgres_fdw.sql     | 166 +++++++
 doc/src/sgml/postgres-fdw.sgml                |  57 ++-
 5 files changed, 953 insertions(+), 19 deletions(-)

diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index d841cec39b..496ef2bae2 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -22,6 +22,7 @@
 #include "postgres_fdw.h"
 #include "storage/fd.h"
 #include "storage/latch.h"
+#include "utils/builtins.h"
 #include "utils/datetime.h"
 #include "utils/hsearch.h"
 #include "utils/inval.h"
@@ -57,6 +58,8 @@ typedef struct ConnCacheEntry
 	bool		have_error;		/* have any subxacts aborted in this xact? */
 	bool		changing_xact_state;	/* xact state change in process */
 	bool		invalidated;	/* true if reconnect is pending */
+	/* Server oid to get the associated foreign server name. */
+	Oid			serverid;
 	uint32		server_hashvalue;	/* hash value of foreign server OID */
 	uint32		mapping_hashvalue;	/* hash value of user mapping OID */
 } ConnCacheEntry;
@@ -73,6 +76,12 @@ static unsigned int prep_stmt_number = 0;
 /* tracks whether any work is needed in callback functions */
 static bool xact_got_connection = false;
 
+/*
+ * SQL functions
+ */
+PG_FUNCTION_INFO_V1(postgres_fdw_get_connections);
+PG_FUNCTION_INFO_V1(postgres_fdw_disconnect);
+
 /* prototypes of private functions */
 static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
 static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
@@ -94,6 +103,8 @@ static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
 static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
 									 PGresult **result);
 static bool UserMappingPasswordRequired(UserMapping *user);
+static bool disconnect_cached_connections(uint32 hashvalue, bool all,
+										  bool *is_in_use);
 
 /*
  * Get a PGconn which can be used to execute queries on the remote PostgreSQL
@@ -273,6 +284,7 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user)
 	entry->have_error = false;
 	entry->changing_xact_state = false;
 	entry->invalidated = false;
+	entry->serverid = server->serverid;
 	entry->server_hashvalue =
 		GetSysCacheHashValue1(FOREIGNSERVEROID,
 							  ObjectIdGetDatum(server->serverid));
@@ -789,6 +801,13 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 	if (!xact_got_connection)
 		return;
 
+	/*
+	 * Quick exit if the cache has been destroyed in
+	 * disconnect_cached_connections.
+	 */
+	if (!ConnectionHash)
+		return;
+
 	/*
 	 * Scan all connection cache entries to find open remote transactions, and
 	 * close them.
@@ -985,6 +1004,13 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
 	if (!xact_got_connection)
 		return;
 
+	/*
+	 * Quick exit if the cache has been destroyed in
+	 * disconnect_cached_connections.
+	 */
+	if (!ConnectionHash)
+		return;
+
 	/*
 	 * Scan all connection cache entries to find open remote subtransactions
 	 * of the current level, and close them.
@@ -1093,6 +1119,13 @@ pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
 
 	Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
 
+	/*
+	 * Quick exit if the cache has been destroyed in
+	 * disconnect_cached_connections.
+	 */
+	if (!ConnectionHash)
+		return;
+
 	/* ConnectionHash must exist already, if we're registered */
 	hash_seq_init(&scan, ConnectionHash);
 	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
@@ -1138,8 +1171,6 @@ pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
 static void
 pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
 {
-	HeapTuple	tup;
-	Form_pg_user_mapping umform;
 	ForeignServer *server;
 
 	/* nothing to do for inactive entries and entries of sane state */
@@ -1150,13 +1181,7 @@ pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
 	disconnect_pg_server(entry);
 
 	/* find server name to be shown in the message below */
-	tup = SearchSysCache1(USERMAPPINGOID,
-						  ObjectIdGetDatum(entry->key));
-	if (!HeapTupleIsValid(tup))
-		elog(ERROR, "cache lookup failed for user mapping %u", entry->key);
-	umform = (Form_pg_user_mapping) GETSTRUCT(tup);
-	server = GetForeignServer(umform->umserver);
-	ReleaseSysCache(tup);
+	server = GetForeignServer(entry->serverid);
 
 	ereport(ERROR,
 			(errcode(ERRCODE_CONNECTION_EXCEPTION),
@@ -1341,3 +1366,279 @@ exit:	;
 		*result = last_res;
 	return timed_out;
 }
+
+/*
+ * List the foreign server connections.
+ *
+ * This function takes no input parameter and returns an array with elements
+ * as pairs of foreign server name and true/false to show whether or not the
+ * connection is valid. The array elements look like (srv1, true),
+ * (srv2, false), (srv3, true) ... (srvn, true). True if the connection is
+ * valid. False if the connection is invalidated in pgfdw_inval_callback. NULL
+ * is returned when there are no cached connections at all.
+ *
+ * This function issues a warning in case for any connection the associated
+ * foreign server has been dropped which means that the server name can not be
+ * read from the system catalogues.
+ */
+Datum
+postgres_fdw_get_connections(PG_FUNCTION_ARGS)
+{
+	ArrayBuildState *astate = NULL;
+	HASH_SEQ_STATUS	scan;
+	ConnCacheEntry	*entry;
+	StringInfoData  buf;
+	uint16 			no_server_conn_cnt = 0;
+
+	if (!ConnectionHash)
+		PG_RETURN_NULL();
+
+	initStringInfo(&buf);
+
+	hash_seq_init(&scan, ConnectionHash);
+	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+	{
+		ForeignServer *server;
+
+		/* We only look for open remote connections. */
+		if (!entry->conn)
+			continue;
+
+		server = GetForeignServerExtended(entry->serverid, true);
+
+		/*
+		 * The foreign server may have been dropped in the current explicit
+		 * transaction. It's not possible to drop the server from another
+		 * session when the connection associated with it is in use in the
+		 * current transaction, if tried so the drop query in another session
+		 * blocks until the current explicit transaction finishes.
+		 *
+		 * Even though the server is dropped in the current explicit
+		 * transaction, the cache can have the associated active connection
+		 * entry. Say we call such connections as dangling. Since we can not
+		 * fetch the server name from system catalogues for dangling
+		 * connections, instead we issue a warning.
+		 *
+		 * We could have done better by storing the server name in the cache
+		 * entry instead of server oid so that it could be used in the output.
+		 * But storing the server name in each cache entry requires 64bytes of
+		 * memory, which is huge, considering the fact that there can exists
+		 * many cached connections and the use case i.e. dropping the foreign
+		 * server within the explicit current transaction seems rare. So, we
+		 * chose to issue a warning instead.
+		 *
+		 * Such dangling connections get closed either in the next use or at
+		 * the end of current explicit transaction in pgfdw_xact_callback.
+		 */
+		if (!server)
+		{
+			if (entry->conn)
+			{
+				/*
+				 * If the server has been dropped in the current explicit
+				 * transaction, then this entry would have been invalidated in
+				 * pgfdw_inval_callback at the end of drop sever command. Note
+				 * that this entry would not have been closed in
+				 * pgfdw_inval_callback because it is still being used in the
+				 * current explicit transaction. So, assert that here.
+				 */
+				Assert(entry->xact_depth > 0 && entry->invalidated);
+
+				no_server_conn_cnt++;
+			}
+
+			continue;
+		}
+
+		appendStringInfo(&buf, "(%s, %s)", server->servername,
+						 entry->invalidated ? "false" : "true");
+
+		/* stash away the prepared string into array value */
+		astate = accumArrayResult(astate, CStringGetTextDatum(buf.data),
+								  false, TEXTOID, CurrentMemoryContext);
+		resetStringInfo(&buf);
+	}
+
+	if (no_server_conn_cnt > 0)
+	{
+		ereport(WARNING,
+				(errmsg_plural("found an active connection for which the foreign server would have been dropped",
+							   "found some active connections for which the foreign servers would have been dropped",
+							   no_server_conn_cnt),
+				 no_server_conn_cnt > 1 ?
+				 errdetail("Such connections get closed either in the next use or at the end of the current transaction.")
+				 : errdetail("Such connection gets closed either in the next use or at the end of the current transaction.")));
+	}
+
+	if (astate)
+		PG_RETURN_ARRAYTYPE_P(makeArrayResult(astate,
+											  CurrentMemoryContext));
+	else
+		PG_RETURN_NULL();
+}
+
+/*
+ * Disconnect the cached connections.
+ *
+ * If server name is provided as input, this function disconnects the
+ * associated cached connection. Otherwise all the cached connections are
+ * disconnected. The cache can be destroyed when there are no active
+ * connections left.
+ *
+ * This function returns false if the cache doesn't exist.
+ * When the cache exists:
+ * 	1) If the server name is provided, it first checks whether the foreign
+ *	   server exists, if not, an error is emitted. Otherwise it disconnects the
+ * 	   associated connection when it's not being used in current transaction
+ * 	   and returns true. If it's in use, then issues a warning and returns
+ * 	   false.
+ *	2) If no input argument is provided, then it tries to disconnect all the
+ *	   connections. If all the connections are not being used, then it
+ *     disconnects them and returns true. If all the connections are being
+ * 	   used, then it issues a warning and returns false. If at least one
+ * 	   connection is closed and others are in use, then issues a warning and
+ * 	   returns true.
+ */
+Datum
+postgres_fdw_disconnect(PG_FUNCTION_ARGS)
+{
+	bool	result = false;
+	bool	is_in_use = false;
+
+	if (!ConnectionHash)
+		PG_RETURN_BOOL(result);
+
+	if (PG_NARGS() == 1)
+	{
+		char			*servername = NULL;
+		ForeignServer	*server = NULL;
+
+		servername = text_to_cstring(PG_GETARG_TEXT_PP(0));
+		server = GetForeignServerByName(servername, true);
+
+		if (server)
+		{
+			uint32	hashvalue;
+
+			hashvalue =
+				GetSysCacheHashValue1(FOREIGNSERVEROID,
+									  ObjectIdGetDatum(server->serverid));
+
+			result = disconnect_cached_connections(hashvalue, false,
+												   &is_in_use);
+		}
+		else
+			ereport(ERROR,
+					(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
+					 errmsg("foreign server \"%s\" does not exist", servername)));
+
+		/*
+		 * Check if the connection associated with the given foreign server is
+		 * in use i.e. entry->xact_depth > 0. Since we can not close it, so
+		 * error out.
+		 */
+		if (is_in_use)
+			ereport(WARNING,
+					(errmsg("cannot close the connection because it is still in use")));
+	}
+	else
+	{
+		result = disconnect_cached_connections(0, true, &is_in_use);
+
+		/*
+		 * Check if some or all of the connections are in use i.e.
+		 * entry->xact_depth > 0. Since we can not close them, so inform the
+		 * user.
+		 */
+		if (is_in_use)
+		{
+			if (result)
+			{
+				/* We closed at least one connection. Others are in use. */
+				ereport(WARNING,
+						(errmsg("cannot close all connections because some of them are still in use")));
+			}
+			else
+			{
+				/* We did not closed any connection, all are in use. */
+				ereport(WARNING,
+						(errmsg("cannot close any connection because they are still in use")));
+			}
+		}
+	}
+
+	PG_RETURN_BOOL(result);
+}
+
+/*
+ * Workhorse to disconnect the cached connections.
+ *
+ * This function tries to disconnect the connections only when they are not in
+ * use in the current transaction.
+ *
+ * If all is true, all the cached connections that are not being used in the
+ * current transaction are disconnected. Otherwise, the unused entries with the
+ * given hashvalue are disconnected.
+ *
+ * This function destroys the cache when there are no active connections. And
+ * set is_in_use flag to true when there exists at least one connection that's
+ * being used in the current transaction.
+ *
+ * This function returns true in the following cases if at least one connection
+ * is disconnected. Otherwise it returns false.
+ */
+static bool
+disconnect_cached_connections(uint32 hashvalue, bool all, bool *is_in_use)
+{
+	bool			result = false;
+	HASH_SEQ_STATUS	scan;
+	ConnCacheEntry	*entry;
+	bool			active_conn_exists = false;
+
+	/* We are here only when ConnectionHash exists. */
+	Assert(ConnectionHash);
+
+	hash_seq_init(&scan, ConnectionHash);
+	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+	{
+		/*
+		 * Either disconnect given or all the active and not in use cached
+		 * connections.
+		 */
+		if ((all || entry->server_hashvalue == hashvalue) &&
+			 entry->conn)
+		{
+			if (entry->xact_depth > 0)
+				*is_in_use = true;
+			else
+			{
+				elog(DEBUG3, "discarding connection %p", entry->conn);
+				disconnect_pg_server(entry);
+				result = true;
+			}
+		}
+
+		/*
+		 * If at least one active connection exists in the cache, mark it so
+		 * that we don't destroy the cache.
+		 */
+		if (entry->conn && !active_conn_exists)
+			active_conn_exists = true;
+	}
+
+	/*
+	 * Destroy the cache if we discarded all the active connections i.e. if
+	 * there is no single active connection, which we can know while scanning
+	 * the cached entries in the above loop. Destroying the cache is better
+	 * than to keep it in the memory with all inactive entries in it to save
+	 * some memory. Cache can get initialized on the subsequent queries to
+	 * foreign server.
+	 */
+	if (!active_conn_exists)
+	{
+		hash_destroy(ConnectionHash);
+		ConnectionHash = NULL;
+	}
+
+	return result;
+}
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index c11092f8cc..ef9f6d7bbd 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -13,12 +13,22 @@ DO $d$
             OPTIONS (dbname '$$||current_database()||$$',
                      port '$$||current_setting('port')||$$'
             )$$;
+		EXECUTE $$CREATE SERVER temploopback1 FOREIGN DATA WRAPPER postgres_fdw
+            OPTIONS (dbname '$$||current_database()||$$',
+                     port '$$||current_setting('port')||$$'
+            )$$;
+		EXECUTE $$CREATE SERVER temploopback2 FOREIGN DATA WRAPPER postgres_fdw
+            OPTIONS (dbname '$$||current_database()||$$',
+                     port '$$||current_setting('port')||$$'
+            )$$;
     END;
 $d$;
 CREATE USER MAPPING FOR public SERVER testserver1
 	OPTIONS (user 'value', password 'value');
 CREATE USER MAPPING FOR CURRENT_USER SERVER loopback;
 CREATE USER MAPPING FOR CURRENT_USER SERVER loopback2;
+CREATE USER MAPPING FOR public SERVER temploopback1;
+CREATE USER MAPPING FOR public SERVER temploopback2;
 -- ===================================================================
 -- create objects used through FDW loopback server
 -- ===================================================================
@@ -129,6 +139,16 @@ CREATE FOREIGN TABLE ft6 (
 	c2 int NOT NULL,
 	c3 text
 ) SERVER loopback2 OPTIONS (schema_name 'S 1', table_name 'T 4');
+CREATE FOREIGN TABLE templbtbl1 (
+	c1 int NOT NULL,
+	c2 int NOT NULL,
+	c3 text
+) SERVER temploopback1 OPTIONS (schema_name 'S 1', table_name 'T 4');
+CREATE FOREIGN TABLE templbtbl2 (
+	c1 int NOT NULL,
+	c2 int NOT NULL,
+	c3 text
+) SERVER temploopback2 OPTIONS (schema_name 'S 1', table_name 'T 4');
 -- ===================================================================
 -- tests for validator
 -- ===================================================================
@@ -191,15 +211,17 @@ ALTER FOREIGN TABLE ft2 OPTIONS (schema_name 'S 1', table_name 'T 1');
 ALTER FOREIGN TABLE ft1 ALTER COLUMN c1 OPTIONS (column_name 'C 1');
 ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (column_name 'C 1');
 \det+
-                              List of foreign tables
- Schema | Table |  Server   |              FDW options              | Description 
---------+-------+-----------+---------------------------------------+-------------
- public | ft1   | loopback  | (schema_name 'S 1', table_name 'T 1') | 
- public | ft2   | loopback  | (schema_name 'S 1', table_name 'T 1') | 
- public | ft4   | loopback  | (schema_name 'S 1', table_name 'T 3') | 
- public | ft5   | loopback  | (schema_name 'S 1', table_name 'T 4') | 
- public | ft6   | loopback2 | (schema_name 'S 1', table_name 'T 4') | 
-(5 rows)
+                                  List of foreign tables
+ Schema |   Table    |    Server     |              FDW options              | Description 
+--------+------------+---------------+---------------------------------------+-------------
+ public | ft1        | loopback      | (schema_name 'S 1', table_name 'T 1') | 
+ public | ft2        | loopback      | (schema_name 'S 1', table_name 'T 1') | 
+ public | ft4        | loopback      | (schema_name 'S 1', table_name 'T 3') | 
+ public | ft5        | loopback      | (schema_name 'S 1', table_name 'T 4') | 
+ public | ft6        | loopback2     | (schema_name 'S 1', table_name 'T 4') | 
+ public | templbtbl1 | temploopback1 | (schema_name 'S 1', table_name 'T 4') | 
+ public | templbtbl2 | temploopback2 | (schema_name 'S 1', table_name 'T 4') | 
+(7 rows)
 
 -- Test that alteration of server options causes reconnection
 -- Remote's errors might be non-English, so hide them to ensure stable results
@@ -9053,3 +9075,378 @@ SELECT 1 FROM ft1 LIMIT 1;
 ALTER SERVER loopback OPTIONS (ADD use_remote_estimate 'off');
 -- The invalid connection gets closed in pgfdw_xact_callback during commit.
 COMMIT;
+-- ========================================================================
+-- Test postgres_fdw_get_connections and postgres_fdw_disconnect functions
+-- ========================================================================
+-- postgres_fdw_get_connections returns an array with elements in a
+-- machine-dependent ordering, so we must resort to unnesting and sorting for a
+-- stable result.
+CREATE FUNCTION fdw_unnest(anyarray) RETURNS SETOF anyelement
+LANGUAGE SQL STRICT IMMUTABLE AS $$
+SELECT $1[i] FROM generate_series(array_lower($1,1), array_upper($1,1)) AS i
+$$;
+-- Ensure to have loopback and loopback2 connections cached.
+SELECT 1 FROM ft1 LIMIT 1;
+ ?column? 
+----------
+        1
+(1 row)
+
+SELECT 1 FROM ft6 LIMIT 1;
+ ?column? 
+----------
+        1
+(1 row)
+
+-- List all the existing cached connections. Should return loopback, loopback2.
+SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1;
+    fdw_unnest     
+-------------------
+ (loopback2, true)
+ (loopback, true)
+(2 rows)
+
+-- loopback connection is disconnected. loopback2 connection still exists.
+SELECT postgres_fdw_disconnect('loopback');		/* TRUE */
+ postgres_fdw_disconnect 
+-------------------------
+ t
+(1 row)
+
+-- List all the existing cached connections. Should return loopback2.
+SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1;
+    fdw_unnest     
+-------------------
+ (loopback2, true)
+(1 row)
+
+-- Cache exists, but the loopback connection is not present in it.
+SELECT postgres_fdw_disconnect('loopback');		/* FALSE */
+ postgres_fdw_disconnect 
+-------------------------
+ f
+(1 row)
+
+-- Cache exists, but the server name provided doesn't exist.
+SELECT postgres_fdw_disconnect('unknownserver');	/* ERROR */
+ERROR:  foreign server "unknownserver" does not exist
+-- Cache and loopback2 connection exist, so discard it.
+SELECT postgres_fdw_disconnect();	/* TRUE */
+ postgres_fdw_disconnect 
+-------------------------
+ t
+(1 row)
+
+-- Cache does not exist.
+SELECT postgres_fdw_disconnect();	/* FALSE */
+ postgres_fdw_disconnect 
+-------------------------
+ f
+(1 row)
+
+-- Test the functions inside explicit xact.
+-- Connections are being used in the xact, so they cannot be disconnected.
+BEGIN;
+SELECT 1 FROM templbtbl1 LIMIT 1;
+ ?column? 
+----------
+        1
+(1 row)
+
+SELECT 1 FROM templbtbl2 LIMIT 1;
+ ?column? 
+----------
+        1
+(1 row)
+
+-- Should output temploopback1, temploopback2.
+SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1;
+      fdw_unnest       
+-----------------------
+ (temploopback1, true)
+ (temploopback2, true)
+(2 rows)
+
+SELECT * FROM postgres_fdw_disconnect('temploopback1'); 	/* WARNING and FALSE */
+WARNING:  cannot close the connection because it is still in use
+ postgres_fdw_disconnect 
+-------------------------
+ f
+(1 row)
+
+SELECT * FROM postgres_fdw_disconnect();	/* WARNING and FALSE */
+WARNING:  cannot close any connection because they are still in use
+ postgres_fdw_disconnect 
+-------------------------
+ f
+(1 row)
+
+-- Should output temploopback1, temploopback2.
+SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1;
+      fdw_unnest       
+-----------------------
+ (temploopback1, true)
+ (temploopback2, true)
+(2 rows)
+
+COMMIT;
+-- Should disconnect temploopback1, temploopback2.
+SELECT * FROM postgres_fdw_disconnect();	/* TRUE */
+ postgres_fdw_disconnect 
+-------------------------
+ t
+(1 row)
+
+-- Connections can be closed in the xact because they are not in use.
+SELECT 1 FROM templbtbl1 LIMIT 1;
+ ?column? 
+----------
+        1
+(1 row)
+
+SELECT 1 FROM templbtbl2 LIMIT 1;
+ ?column? 
+----------
+        1
+(1 row)
+
+BEGIN;
+-- Should output temploopback1, temploopback2.
+SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1;
+      fdw_unnest       
+-----------------------
+ (temploopback1, true)
+ (temploopback2, true)
+(2 rows)
+
+SELECT * FROM postgres_fdw_disconnect();	/* TRUE */
+ postgres_fdw_disconnect 
+-------------------------
+ t
+(1 row)
+
+-- Should return nothing.
+SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1;
+ fdw_unnest 
+------------
+(0 rows)
+
+COMMIT;
+-- temploopback1 connection is closed and temploopback2 is not, because it's
+-- being used in the xact.
+SELECT 1 FROM templbtbl1 LIMIT 1;
+ ?column? 
+----------
+        1
+(1 row)
+
+BEGIN;
+SELECT 1 FROM templbtbl2 LIMIT 1;
+ ?column? 
+----------
+        1
+(1 row)
+
+-- Should output temploopback1, temploopback2.
+SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1;
+      fdw_unnest       
+-----------------------
+ (temploopback1, true)
+ (temploopback2, true)
+(2 rows)
+
+SELECT * FROM postgres_fdw_disconnect('temploopback1');		/* TRUE */
+ postgres_fdw_disconnect 
+-------------------------
+ t
+(1 row)
+
+-- Should output temploopback2.
+SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1;
+      fdw_unnest       
+-----------------------
+ (temploopback2, true)
+(1 row)
+
+COMMIT;
+-- Should disconnect temploopback2.
+SELECT * FROM postgres_fdw_disconnect();	/* TRUE */
+ postgres_fdw_disconnect 
+-------------------------
+ t
+(1 row)
+
+-- temploopback1 connection is closed and temploopback2 is not, because it's
+-- being used in the xact.
+SELECT 1 FROM templbtbl1 LIMIT 1;
+ ?column? 
+----------
+        1
+(1 row)
+
+BEGIN;
+SELECT 1 FROM templbtbl2 LIMIT 1;
+ ?column? 
+----------
+        1
+(1 row)
+
+-- Should output temploopback1, temploopback2.
+SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1;
+      fdw_unnest       
+-----------------------
+ (temploopback1, true)
+ (temploopback2, true)
+(2 rows)
+
+SELECT * FROM postgres_fdw_disconnect();	/* WARNING and TRUE */
+WARNING:  cannot close all connections because some of them are still in use
+ postgres_fdw_disconnect 
+-------------------------
+ t
+(1 row)
+
+-- Should output temploopback2.
+SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1;
+      fdw_unnest       
+-----------------------
+ (temploopback2, true)
+(1 row)
+
+COMMIT;
+-- Should disconnect temploopback2.
+SELECT * FROM postgres_fdw_disconnect();	/* TRUE */
+ postgres_fdw_disconnect 
+-------------------------
+ t
+(1 row)
+
+-- temploopback1 connection is invalidated and temploopback2 is not.
+BEGIN;
+SELECT 1 FROM templbtbl1 LIMIT 1;
+ ?column? 
+----------
+        1
+(1 row)
+
+SELECT 1 FROM templbtbl2 LIMIT 1;
+ ?column? 
+----------
+        1
+(1 row)
+
+ALTER SERVER temploopback1 OPTIONS (ADD use_remote_estimate 'off');
+-- Should output temploopback1 as invalid, temploopback2 as valid.
+SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1;
+       fdw_unnest       
+------------------------
+ (temploopback1, false)
+ (temploopback2, true)
+(2 rows)
+
+COMMIT;
+-- Invalidated connection i.e. temploopback1 was closed at the end of the xact.
+-- Should output temploopback2.
+SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1;
+      fdw_unnest       
+-----------------------
+ (temploopback2, true)
+(1 row)
+
+-- Should disconnect temploopback2.
+SELECT * FROM postgres_fdw_disconnect();	/* TRUE */
+ postgres_fdw_disconnect 
+-------------------------
+ t
+(1 row)
+
+-- Both temploopback1 and temploopback2 connections are invalidated.
+BEGIN;
+SELECT 1 FROM templbtbl1 LIMIT 1;
+ ?column? 
+----------
+        1
+(1 row)
+
+SELECT 1 FROM templbtbl2 LIMIT 1;
+ ?column? 
+----------
+        1
+(1 row)
+
+ALTER SERVER temploopback1 OPTIONS (SET use_remote_estimate 'off');
+ALTER SERVER temploopback2 OPTIONS (ADD use_remote_estimate 'off');
+-- Should output both temploopback1 and temploopback2 as invalid.
+SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1;
+       fdw_unnest       
+------------------------
+ (temploopback1, false)
+ (temploopback2, false)
+(2 rows)
+
+COMMIT;
+-- Invalidated connections i.e. temploopback1 and temploopback2 were closed at
+-- the end of the xact. Should output nothing.
+SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1;
+ fdw_unnest 
+------------
+(0 rows)
+
+-- No active connections.
+SELECT * FROM postgres_fdw_disconnect();	/* FALSE */
+ postgres_fdw_disconnect 
+-------------------------
+ f
+(1 row)
+
+-- Both the servers were dropped inside the xact block, so a warning is
+-- emitted.
+BEGIN;
+SELECT 1 FROM templbtbl1 LIMIT 1;
+ ?column? 
+----------
+        1
+(1 row)
+
+SELECT 1 FROM templbtbl2 LIMIT 1;
+ ?column? 
+----------
+        1
+(1 row)
+
+-- Should output temploopback1, temploopback2.
+SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1;
+      fdw_unnest       
+-----------------------
+ (temploopback1, true)
+ (temploopback2, true)
+(2 rows)
+
+DROP SERVER temploopback1 CASCADE;
+NOTICE:  drop cascades to 2 other objects
+DETAIL:  drop cascades to user mapping for public on server temploopback1
+drop cascades to foreign table templbtbl1
+-- Should output temploopback2.
+SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; /* WARNING */
+WARNING:  found an active connection for which the foreign server would have been dropped
+DETAIL:  Such connection gets closed either in the next use or at the end of the current transaction.
+      fdw_unnest       
+-----------------------
+ (temploopback2, true)
+(1 row)
+
+DROP SERVER temploopback2 CASCADE;
+NOTICE:  drop cascades to 2 other objects
+DETAIL:  drop cascades to user mapping for public on server temploopback2
+drop cascades to foreign table templbtbl2
+-- Should output nothing.
+SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; /* WARNING */
+WARNING:  found some active connections for which the foreign servers would have been dropped
+DETAIL:  Such connections get closed either in the next use or at the end of the current transaction.
+ fdw_unnest 
+------------
+(0 rows)
+
+COMMIT;
+-- Clean up.
+DROP FUNCTION fdw_unnest;
diff --git a/contrib/postgres_fdw/postgres_fdw--1.0.sql b/contrib/postgres_fdw/postgres_fdw--1.0.sql
index a0f0fc1bf4..edbb5911d2 100644
--- a/contrib/postgres_fdw/postgres_fdw--1.0.sql
+++ b/contrib/postgres_fdw/postgres_fdw--1.0.sql
@@ -16,3 +16,18 @@ LANGUAGE C STRICT;
 CREATE FOREIGN DATA WRAPPER postgres_fdw
   HANDLER postgres_fdw_handler
   VALIDATOR postgres_fdw_validator;
+
+CREATE FUNCTION postgres_fdw_get_connections ()
+RETURNS text[]
+AS 'MODULE_PATHNAME','postgres_fdw_get_connections'
+LANGUAGE C STRICT PARALLEL RESTRICTED;
+
+CREATE FUNCTION postgres_fdw_disconnect ()
+RETURNS bool
+AS 'MODULE_PATHNAME','postgres_fdw_disconnect'
+LANGUAGE C STRICT PARALLEL RESTRICTED;
+
+CREATE FUNCTION postgres_fdw_disconnect (text)
+RETURNS bool
+AS 'MODULE_PATHNAME','postgres_fdw_disconnect'
+LANGUAGE C STRICT PARALLEL RESTRICTED;
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 25dbc08b98..2267aee630 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -15,6 +15,14 @@ DO $d$
             OPTIONS (dbname '$$||current_database()||$$',
                      port '$$||current_setting('port')||$$'
             )$$;
+		EXECUTE $$CREATE SERVER temploopback1 FOREIGN DATA WRAPPER postgres_fdw
+            OPTIONS (dbname '$$||current_database()||$$',
+                     port '$$||current_setting('port')||$$'
+            )$$;
+		EXECUTE $$CREATE SERVER temploopback2 FOREIGN DATA WRAPPER postgres_fdw
+            OPTIONS (dbname '$$||current_database()||$$',
+                     port '$$||current_setting('port')||$$'
+            )$$;
     END;
 $d$;
 
@@ -22,6 +30,8 @@ CREATE USER MAPPING FOR public SERVER testserver1
 	OPTIONS (user 'value', password 'value');
 CREATE USER MAPPING FOR CURRENT_USER SERVER loopback;
 CREATE USER MAPPING FOR CURRENT_USER SERVER loopback2;
+CREATE USER MAPPING FOR public SERVER temploopback1;
+CREATE USER MAPPING FOR public SERVER temploopback2;
 
 -- ===================================================================
 -- create objects used through FDW loopback server
@@ -142,6 +152,17 @@ CREATE FOREIGN TABLE ft6 (
 	c3 text
 ) SERVER loopback2 OPTIONS (schema_name 'S 1', table_name 'T 4');
 
+CREATE FOREIGN TABLE templbtbl1 (
+	c1 int NOT NULL,
+	c2 int NOT NULL,
+	c3 text
+) SERVER temploopback1 OPTIONS (schema_name 'S 1', table_name 'T 4');
+
+CREATE FOREIGN TABLE templbtbl2 (
+	c1 int NOT NULL,
+	c2 int NOT NULL,
+	c3 text
+) SERVER temploopback2 OPTIONS (schema_name 'S 1', table_name 'T 4');
 -- ===================================================================
 -- tests for validator
 -- ===================================================================
@@ -2711,3 +2732,148 @@ SELECT 1 FROM ft1 LIMIT 1;
 ALTER SERVER loopback OPTIONS (ADD use_remote_estimate 'off');
 -- The invalid connection gets closed in pgfdw_xact_callback during commit.
 COMMIT;
+
+-- ========================================================================
+-- Test postgres_fdw_get_connections and postgres_fdw_disconnect functions
+-- ========================================================================
+
+-- postgres_fdw_get_connections returns an array with elements in a
+-- machine-dependent ordering, so we must resort to unnesting and sorting for a
+-- stable result.
+CREATE FUNCTION fdw_unnest(anyarray) RETURNS SETOF anyelement
+LANGUAGE SQL STRICT IMMUTABLE AS $$
+SELECT $1[i] FROM generate_series(array_lower($1,1), array_upper($1,1)) AS i
+$$;
+
+-- Ensure to have loopback and loopback2 connections cached.
+SELECT 1 FROM ft1 LIMIT 1;
+SELECT 1 FROM ft6 LIMIT 1;
+
+-- List all the existing cached connections. Should return loopback, loopback2.
+SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1;
+
+-- loopback connection is disconnected. loopback2 connection still exists.
+SELECT postgres_fdw_disconnect('loopback');		/* TRUE */
+
+-- List all the existing cached connections. Should return loopback2.
+SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1;
+
+-- Cache exists, but the loopback connection is not present in it.
+SELECT postgres_fdw_disconnect('loopback');		/* FALSE */
+
+-- Cache exists, but the server name provided doesn't exist.
+SELECT postgres_fdw_disconnect('unknownserver');	/* ERROR */
+
+-- Cache and loopback2 connection exist, so discard it.
+SELECT postgres_fdw_disconnect();	/* TRUE */
+
+-- Cache does not exist.
+SELECT postgres_fdw_disconnect();	/* FALSE */
+
+-- Test the functions inside explicit xact.
+-- Connections are being used in the xact, so they cannot be disconnected.
+BEGIN;
+SELECT 1 FROM templbtbl1 LIMIT 1;
+SELECT 1 FROM templbtbl2 LIMIT 1;
+-- Should output temploopback1, temploopback2.
+SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1;
+SELECT * FROM postgres_fdw_disconnect('temploopback1'); 	/* WARNING and FALSE */
+SELECT * FROM postgres_fdw_disconnect();	/* WARNING and FALSE */
+-- Should output temploopback1, temploopback2.
+SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1;
+COMMIT;
+
+-- Should disconnect temploopback1, temploopback2.
+SELECT * FROM postgres_fdw_disconnect();	/* TRUE */
+
+-- Connections can be closed in the xact because they are not in use.
+SELECT 1 FROM templbtbl1 LIMIT 1;
+SELECT 1 FROM templbtbl2 LIMIT 1;
+BEGIN;
+-- Should output temploopback1, temploopback2.
+SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1;
+SELECT * FROM postgres_fdw_disconnect();	/* TRUE */
+-- Should return nothing.
+SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1;
+COMMIT;
+
+-- temploopback1 connection is closed and temploopback2 is not, because it's
+-- being used in the xact.
+SELECT 1 FROM templbtbl1 LIMIT 1;
+BEGIN;
+SELECT 1 FROM templbtbl2 LIMIT 1;
+-- Should output temploopback1, temploopback2.
+SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1;
+SELECT * FROM postgres_fdw_disconnect('temploopback1');		/* TRUE */
+-- Should output temploopback2.
+SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1;
+COMMIT;
+
+-- Should disconnect temploopback2.
+SELECT * FROM postgres_fdw_disconnect();	/* TRUE */
+
+-- temploopback1 connection is closed and temploopback2 is not, because it's
+-- being used in the xact.
+SELECT 1 FROM templbtbl1 LIMIT 1;
+BEGIN;
+SELECT 1 FROM templbtbl2 LIMIT 1;
+-- Should output temploopback1, temploopback2.
+SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1;
+SELECT * FROM postgres_fdw_disconnect();	/* WARNING and TRUE */
+-- Should output temploopback2.
+SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1;
+COMMIT;
+
+-- Should disconnect temploopback2.
+SELECT * FROM postgres_fdw_disconnect();	/* TRUE */
+
+-- temploopback1 connection is invalidated and temploopback2 is not.
+BEGIN;
+SELECT 1 FROM templbtbl1 LIMIT 1;
+SELECT 1 FROM templbtbl2 LIMIT 1;
+ALTER SERVER temploopback1 OPTIONS (ADD use_remote_estimate 'off');
+-- Should output temploopback1 as invalid, temploopback2 as valid.
+SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1;
+COMMIT;
+
+-- Invalidated connection i.e. temploopback1 was closed at the end of the xact.
+-- Should output temploopback2.
+SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1;
+
+-- Should disconnect temploopback2.
+SELECT * FROM postgres_fdw_disconnect();	/* TRUE */
+
+-- Both temploopback1 and temploopback2 connections are invalidated.
+BEGIN;
+SELECT 1 FROM templbtbl1 LIMIT 1;
+SELECT 1 FROM templbtbl2 LIMIT 1;
+ALTER SERVER temploopback1 OPTIONS (SET use_remote_estimate 'off');
+ALTER SERVER temploopback2 OPTIONS (ADD use_remote_estimate 'off');
+-- Should output both temploopback1 and temploopback2 as invalid.
+SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1;
+COMMIT;
+
+-- Invalidated connections i.e. temploopback1 and temploopback2 were closed at
+-- the end of the xact. Should output nothing.
+SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1;
+
+-- No active connections.
+SELECT * FROM postgres_fdw_disconnect();	/* FALSE */
+
+-- Both the servers were dropped inside the xact block, so a warning is
+-- emitted.
+BEGIN;
+SELECT 1 FROM templbtbl1 LIMIT 1;
+SELECT 1 FROM templbtbl2 LIMIT 1;
+-- Should output temploopback1, temploopback2.
+SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1;
+DROP SERVER temploopback1 CASCADE;
+-- Should output temploopback2.
+SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; /* WARNING */
+DROP SERVER temploopback2 CASCADE;
+-- Should output nothing.
+SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; /* WARNING */
+COMMIT;
+
+-- Clean up.
+DROP FUNCTION fdw_unnest;
diff --git a/doc/src/sgml/postgres-fdw.sgml b/doc/src/sgml/postgres-fdw.sgml
index e6fd2143c1..405923f2aa 100644
--- a/doc/src/sgml/postgres-fdw.sgml
+++ b/doc/src/sgml/postgres-fdw.sgml
@@ -477,7 +477,47 @@ OPTIONS (ADD password_required 'false');
    </para>
 
   </sect3>
- </sect2>
+
+<sect2>
+  <title>Functions</title>
+
+  <para>
+   <function>postgres_fdw_get_connections</function> ( ) which takes no input.
+   When called in the local session, it returns an array with each element as a
+   pair of the foreign server names of all the open connections that are
+   previously made to the foreign servers and <literal>true</literal> or
+   <literal>false</literal> to show whether or not the connection is valid. 
+   The foreign server connections can get invalidated due to alter statements
+   on foreign server or user mapping. If there are no open connections, then
+   <literal>NULL</literal> is returned. This function issues a warning in case
+   for any connection, the associated foreign server has been dropped and the
+   server name can not be fetched from the system catalogues.
+  </para>
+
+  <para>
+   <function>postgres_fdw_disconnect</function> ( <parameter>servername</parameter> <type>text</type> )
+   which takes foreign server name as input. When called in the local session,
+   it discards the unused open connection previously made to the foreign server
+   and returns <literal>true</literal>. If the open connection is still being
+   used in the current transaction, it is not discarded, instead a warning is
+   issued and <literal>false</literal> is returned. <literal>false</literal> is
+   returned when there are no open connections. When there are some open
+   connections, but there is no connection for the given foreign server,
+   then <literal>false</literal> is returned. When no foreign server exists
+   with the given name, an error is emitted.
+  </para>
+
+  <para>
+   <function>postgres_fdw_disconnect</function> ( ) which takes no input.
+   When called in the local session, it discards all the unused open
+   connections that are previously made to the foreign servers and returns
+   <literal>true</literal>. If there is any open connection that is still being
+   used in the current transaction, then a warning is issued. <literal>false</literal>
+   is returned when no open connection is discarded or there are no open
+   connections at all.
+  </para>
+
+</sect2>
 
  <sect2>
   <title>Connection Management</title>
@@ -490,6 +530,21 @@ OPTIONS (ADD password_required 'false');
    multiple user identities (user mappings) are used to access the foreign
    server, a connection is established for each user mapping.
   </para>
+
+  <para>
+   Since the <filename>postgres_fdw</filename> keeps the connections to remote
+   servers in the local session, the corresponding sessions that are opened on
+   the remote servers are kept idle until they are re-used by the local session.
+   This may waste resources if those connections are not frequently used by the
+   local session. To address this, the <filename>postgres_fdw</filename>
+   provides following way to remove the connections to the remote servers and
+   so the remote sessions:
+    
+   <function>postgres_fdw_disconnect()</function> to discard all the
+   connections or <function>postgres_fdw_disconnect(text)</function>
+   to discard the connection associated with the given foreign server.
+  
+  </para>
  </sect2>
 
  <sect2>
-- 
2.25.1

Reply via email to