From 3d2d5473fae0ed9675c3302caacabdc45dfb72bc Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddy@enterprisedb.com>
Date: Mon, 18 Jan 2021 17:56:57 +0530
Subject: [PATCH v12] postgres_fdw function to discard cached connections

This patch introduces a new function postgres_fdw_disconnect().
When called with a foreign server name, it discards the associated
connection with the server. When called without any argument, it
discards all the existing cached connections.
---
 contrib/postgres_fdw/connection.c             | 196 +++++++++++++++++-
 .../postgres_fdw/expected/postgres_fdw.out    |  93 +++++++++
 .../postgres_fdw/postgres_fdw--1.0--1.1.sql   |  10 +
 contrib/postgres_fdw/sql/postgres_fdw.sql     |  35 ++++
 doc/src/sgml/postgres-fdw.sgml                |  58 ++++++
 5 files changed, 384 insertions(+), 8 deletions(-)

diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index a1404cb6bb..1e93a84aaa 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -68,6 +68,7 @@ typedef struct ConnCacheEntry
  * Connection cache (initialized on first use)
  */
 static HTAB *ConnectionHash = NULL;
+static bool conn_cache_destroyed = false;
 
 /* for assigning cursor numbers and prepared statement numbers */
 static unsigned int cursor_number = 0;
@@ -80,6 +81,7 @@ static bool xact_got_connection = false;
  * SQL functions
  */
 PG_FUNCTION_INFO_V1(postgres_fdw_get_connections);
+PG_FUNCTION_INFO_V1(postgres_fdw_disconnect);
 
 /* prototypes of private functions */
 static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
@@ -102,6 +104,7 @@ static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
 static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
 									 PGresult **result);
 static bool UserMappingPasswordRequired(UserMapping *user);
+static bool disconnect_cached_connections(uint32 hashvalue, bool all);
 
 /*
  * Get a PGconn which can be used to execute queries on the remote PostgreSQL
@@ -134,15 +137,20 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
 									 HASH_ELEM | HASH_BLOBS);
 
 		/*
-		 * Register some callback functions that manage connection cleanup.
-		 * This should be done just once in each backend.
+		 * Register callback functions that manage connection cleanup. This
+		 * should be done just once in each backend. We don't register the
+		 * callbacks again, if the connection cache is destroyed at least once
+		 * in the backend.
 		 */
-		RegisterXactCallback(pgfdw_xact_callback, NULL);
-		RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
-		CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
-									  pgfdw_inval_callback, (Datum) 0);
-		CacheRegisterSyscacheCallback(USERMAPPINGOID,
-									  pgfdw_inval_callback, (Datum) 0);
+		if (!conn_cache_destroyed)
+		{
+			RegisterXactCallback(pgfdw_xact_callback, NULL);
+			RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
+			CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
+										  pgfdw_inval_callback, (Datum) 0);
+			CacheRegisterSyscacheCallback(USERMAPPINGOID,
+										  pgfdw_inval_callback, (Datum) 0);
+		}
 	}
 
 	/* Set flag that we did GetConnection during the current transaction */
@@ -1102,6 +1110,13 @@ pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
 
 	Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
 
+	/*
+	 * Quick exit if the cache has been destroyed in
+	 * disconnect_cached_connections.
+	 */
+	if (!ConnectionHash)
+		return;
+
 	/* ConnectionHash must exist already, if we're registered */
 	hash_seq_init(&scan, ConnectionHash);
 	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
@@ -1470,3 +1485,168 @@ postgres_fdw_get_connections(PG_FUNCTION_ARGS)
 
 	PG_RETURN_VOID();
 }
+
+/*
+ * Disconnect cached connections.
+ *
+ * If server name is provided as input, this function disconnects the
+ * associated cached connection. Otherwise all the cached connections are
+ * disconnected. The cache can be destroyed when there are no active
+ * connections left.
+ *
+ * This function returns false if the cache doesn't exist.
+ * When the cache exists:
+ * 	1) If the server name is provided, it first checks whether the foreign
+ *	   server exists, if not, an error is emitted. Otherwise it disconnects the
+ * 	   associated connection when it's not being used in current transaction
+ * 	   and returns true. If it's in use, then issues a warning and returns
+ * 	   false.
+ *	2) If no input argument is provided, then it tries to disconnect all the
+ *	   connections. If all the connections are not being used, then it
+ *     disconnects them and returns true. If all the connections are being
+ * 	   used, then it issues a warning and returns false. If at least one
+ * 	   connection is closed and others are in use, then issues a warning and
+ * 	   returns true.
+ */
+Datum
+postgres_fdw_disconnect(PG_FUNCTION_ARGS)
+{
+	bool	result = false;
+
+	if (!ConnectionHash)
+		PG_RETURN_BOOL(result);
+
+	if (PG_NARGS() == 1)
+	{
+		ForeignServer	*server = NULL;
+		char	*servername = NULL;
+		uint32	hashvalue;
+
+		servername = text_to_cstring(PG_GETARG_TEXT_PP(0));
+		server = GetForeignServerByName(servername, true);
+
+		if (!server)
+			ereport(ERROR,
+					(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
+					 errmsg("foreign server \"%s\" does not exist", servername)));
+
+		hashvalue = GetSysCacheHashValue1(FOREIGNSERVEROID,
+										  ObjectIdGetDatum(server->serverid));
+		result = disconnect_cached_connections(hashvalue, false);
+	}
+	else
+		result = disconnect_cached_connections(0, true);
+
+	PG_RETURN_BOOL(result);
+}
+
+/*
+ * Workhorse to disconnect the cached connections.
+ *
+ * This function tries to disconnect the connections only when they are not in
+ * use in the current transaction.
+ *
+ * If all is true, all the cached connections that are not being used in the
+ * current transaction are disconnected. Otherwise, the unused entries with the
+ * given hashvalue are disconnected.
+ *
+ * This function destroys the cache when there are no active connections.
+ *
+ * This function returns true in the following cases if at least one connection
+ * is disconnected. Otherwise it returns false.
+ */
+static bool
+disconnect_cached_connections(uint32 hashvalue, bool all)
+{
+	HASH_SEQ_STATUS	scan;
+	ConnCacheEntry	*entry;
+	bool	result = false;
+	bool	is_in_use = false;
+	bool	active_conn_exists = false;
+
+	/* We are here only when ConnectionHash exists. */
+	Assert(ConnectionHash);
+
+	hash_seq_init(&scan, ConnectionHash);
+	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+	{
+		/*
+		 * Either disconnect given or all the active and not in use cached
+		 * connections.
+		 */
+		if ((all || entry->server_hashvalue == hashvalue) &&
+			 entry->conn)
+		{
+			if (entry->xact_depth > 0)
+				is_in_use = true;
+			else
+			{
+				elog(DEBUG3, "discarding connection %p", entry->conn);
+				disconnect_pg_server(entry);
+				result = true;
+			}
+		}
+
+		/*
+		 * If at least one active connection exists in the cache, ensure we
+		 * don't destroy the cache.
+		 */
+		if (entry->conn && !active_conn_exists)
+			active_conn_exists = true;
+	}
+
+	/*
+	 * is_in_use flag would be set to true when there exists at least one
+	 * connection that's being used in the current transaction.
+	 */
+	if (all)
+	{
+		/*
+		 * Check if some or all of the connections are in use i.e.
+		 * entry->xact_depth > 0. Since we can not close them, so inform user.
+		 */
+		if (is_in_use)
+		{
+			if (result)
+			{
+				/* We closed at least one connection, others are in use. */
+				ereport(WARNING,
+						(errmsg("cannot close all connections because some of them are still in use")));
+			}
+			else
+			{
+				/* We did not close any connection, all are in use. */
+				ereport(WARNING,
+						(errmsg("cannot close any connection because they are still in use")));
+			}
+		}
+	}
+	else
+	{
+		/*
+		 * Check if the connection associated with the given foreign server is
+		 * in use i.e. entry->xact_depth > 0. Since we can not close it, so
+		 * error out.
+		 */
+		if (is_in_use)
+			ereport(WARNING,
+					(errmsg("cannot close the connection because it is still in use")));
+	}
+
+	/*
+	 * Destroy the cache if we discarded all active connections i.e. if there
+	 * is no single active connection, which we can know while scanning the
+	 * cached entries in the above loop. Destroying the cache is better than to
+	 * keep it in the memory with all inactive entries in it to save some
+	 * memory. Cache can get initialized on the subsequent queries to foreign
+	 * server.
+	 */
+	if (!active_conn_exists)
+	{
+		hash_destroy(ConnectionHash);
+		ConnectionHash = NULL;
+		conn_cache_destroyed = true;
+	}
+
+	return result;
+}
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 1cad311436..33c61d8a14 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -9112,3 +9112,96 @@ SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
  loopback2   | t
 (1 row)
 
+-- ===================================================================
+-- test postgres_fdw_disconnect function
+-- ===================================================================
+-- Returns true as it closes loopback2 connection.
+SELECT * FROM postgres_fdw_disconnect('loopback2');
+ postgres_fdw_disconnect 
+-------------------------
+ t
+(1 row)
+
+-- Returns false as there is no cached connection.
+SELECT * FROM postgres_fdw_disconnect();
+ postgres_fdw_disconnect 
+-------------------------
+ f
+(1 row)
+
+-- Ensure to cache loopback connection.
+SELECT 1 FROM ft1 LIMIT 1;
+ ?column? 
+----------
+        1
+(1 row)
+
+BEGIN;
+-- Ensure to cache loopback2 connection.
+SELECT 1 FROM ft6 LIMIT 1;
+ ?column? 
+----------
+        1
+(1 row)
+
+-- List all the existing cached connections. loopback and loopback2 should be
+-- output.
+SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
+ server_name | valid 
+-------------+-------
+ loopback    | t
+ loopback2   | t
+(2 rows)
+
+-- Issues a warning, returns false as loopback2 connection is still in use and
+-- can not be closed.
+SELECT * FROM postgres_fdw_disconnect('loopback2');
+WARNING:  cannot close the connection because it is still in use
+ postgres_fdw_disconnect 
+-------------------------
+ f
+(1 row)
+
+-- Closes loopback connection, returns true and issues a warning as loopback2
+-- connection is still in use and can not be closed.
+SELECT * FROM postgres_fdw_disconnect();
+WARNING:  cannot close all connections because some of them are still in use
+ postgres_fdw_disconnect 
+-------------------------
+ t
+(1 row)
+
+-- Issues a warning and returns false as it can not close any connection in the
+-- cache.
+SELECT * FROM postgres_fdw_disconnect();
+WARNING:  cannot close any connection because they are still in use
+ postgres_fdw_disconnect 
+-------------------------
+ f
+(1 row)
+
+COMMIT;
+-- List all the existing cached connections. loopback2 should be output.
+SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
+ server_name | valid 
+-------------+-------
+ loopback2   | t
+(1 row)
+
+-- Returns an error as there is no foreign server with given name.
+SELECT * FROM postgres_fdw_disconnect('unknownserver');
+ERROR:  foreign server "unknownserver" does not exist
+-- Closes loopback2 connection and returns true.
+SELECT * FROM postgres_fdw_disconnect();
+ postgres_fdw_disconnect 
+-------------------------
+ t
+(1 row)
+
+-- List all the existing cached connections. No connection exists, so NULL
+-- should be output.
+SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
+ server_name | valid 
+-------------+-------
+(0 rows)
+
diff --git a/contrib/postgres_fdw/postgres_fdw--1.0--1.1.sql b/contrib/postgres_fdw/postgres_fdw--1.0--1.1.sql
index 7f85784466..63dff0f7a8 100644
--- a/contrib/postgres_fdw/postgres_fdw--1.0--1.1.sql
+++ b/contrib/postgres_fdw/postgres_fdw--1.0--1.1.sql
@@ -8,3 +8,13 @@ CREATE FUNCTION postgres_fdw_get_connections (OUT server_name text,
 RETURNS SETOF record
 AS 'MODULE_PATHNAME'
 LANGUAGE C STRICT PARALLEL RESTRICTED;
+
+CREATE FUNCTION postgres_fdw_disconnect ()
+RETURNS bool
+AS 'MODULE_PATHNAME','postgres_fdw_disconnect'
+LANGUAGE C STRICT PARALLEL RESTRICTED;
+
+CREATE FUNCTION postgres_fdw_disconnect (text)
+RETURNS bool
+AS 'MODULE_PATHNAME','postgres_fdw_disconnect'
+LANGUAGE C STRICT PARALLEL RESTRICTED;
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index ebf6eb10a6..e9c3fd2d41 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -2738,3 +2738,38 @@ COMMIT;
 -- should not be output because they should be closed at the end of
 -- the above transaction.
 SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
+
+-- ===================================================================
+-- test postgres_fdw_disconnect function
+-- ===================================================================
+-- Returns true as it closes loopback2 connection.
+SELECT * FROM postgres_fdw_disconnect('loopback2');
+-- Returns false as there is no cached connection.
+SELECT * FROM postgres_fdw_disconnect();
+-- Ensure to cache loopback connection.
+SELECT 1 FROM ft1 LIMIT 1;
+BEGIN;
+-- Ensure to cache loopback2 connection.
+SELECT 1 FROM ft6 LIMIT 1;
+-- List all the existing cached connections. loopback and loopback2 should be
+-- output.
+SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
+-- Issues a warning, returns false as loopback2 connection is still in use and
+-- can not be closed.
+SELECT * FROM postgres_fdw_disconnect('loopback2');
+-- Closes loopback connection, returns true and issues a warning as loopback2
+-- connection is still in use and can not be closed.
+SELECT * FROM postgres_fdw_disconnect();
+-- Issues a warning and returns false as it can not close any connection in the
+-- cache.
+SELECT * FROM postgres_fdw_disconnect();
+COMMIT;
+-- List all the existing cached connections. loopback2 should be output.
+SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
+-- Returns an error as there is no foreign server with given name.
+SELECT * FROM postgres_fdw_disconnect('unknownserver');
+-- Closes loopback2 connection and returns true.
+SELECT * FROM postgres_fdw_disconnect();
+-- List all the existing cached connections. No connection exists, so NULL
+-- should be output.
+SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
diff --git a/doc/src/sgml/postgres-fdw.sgml b/doc/src/sgml/postgres-fdw.sgml
index 6a91926da8..4c5264aaa3 100644
--- a/doc/src/sgml/postgres-fdw.sgml
+++ b/doc/src/sgml/postgres-fdw.sgml
@@ -507,6 +507,50 @@ postgres=# SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
      </para>
     </listitem>
    </varlistentry>
+
+   <varlistentry>
+    <term><function>postgres_fdw_disconnect(IN servername text) returns boolean</function></term>
+    <listitem>
+     <para>
+      When called in the local session with the foreign server name as input,
+      it discards the unused open connection previously made to the foreign
+      server and returns <literal>true</literal>. If the open connection is
+      still being used in the current transaction, it is not discarded, instead
+      a warning is issued and <literal>false</literal> is returned. <literal>false</literal>
+      is returned when there are no open connections. When there are some open
+      connections, but there is no connection for the given foreign server,
+      then <literal>false</literal> is returned. When no foreign server exists
+      with the given name, an error is emitted. Example usage of the function:
+    <screen>
+postgres=# SELECT * FROM postgres_fdw_disconnect('loopback1');
+ postgres_fdw_disconnect 
+-------------------------
+ t
+</screen>
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term><function>postgres_fdw_disconnect() returns boolean</function></term>
+    <listitem>
+     <para>
+      When called in the local session with no input argument, it discards all
+      the unused open connections that are previously made to the foreign
+      servers and returns <literal>true</literal>. If there is any open
+      connection that is still being used in the current transaction, then a
+      warning is issued. <literal>false</literal> is returned when no open
+      connection is discarded or there are no open connections at all. Example
+      usage of the function:
+    <screen>
+postgres=# SELECT * FROM postgres_fdw_disconnect();
+ postgres_fdw_disconnect 
+-------------------------
+ t
+</screen>
+     </para>
+    </listitem>
+   </varlistentry>
    </variablelist>
 
 </sect2>
@@ -522,6 +566,20 @@ postgres=# SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
    multiple user identities (user mappings) are used to access the foreign
    server, a connection is established for each user mapping.
   </para>
+
+  <para>
+   Since the <filename>postgres_fdw</filename> keeps the connections to remote
+   servers in the local session, the corresponding sessions that are opened on
+   the remote servers are kept idle until they are re-used by the local session.
+   This may waste resources if those connections are not frequently used by the
+   local session. To address this, the <filename>postgres_fdw</filename>
+   provides following way to remove the connections to the remote servers and
+   so the remote sessions:
+    
+   <function>postgres_fdw_disconnect()</function> to discard all the
+   connections or <function>postgres_fdw_disconnect(text)</function>
+   to discard the connection associated with the given foreign server.
+  </para>
  </sect2>
 
  <sect2>
-- 
2.25.1

