From c28f59a43386a928907016b54650cf09ad2dd137 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddy@enterprisedb.com>
Date: Mon, 18 Jan 2021 10:11:40 +0530
Subject: [PATCH v2] postgres_fdw_get_connections

---
 contrib/postgres_fdw/Makefile                 |   2 +-
 contrib/postgres_fdw/connection.c             | 135 ++++++++++++++++++
 .../postgres_fdw/expected/postgres_fdw.out    |  61 +++++++-
 .../postgres_fdw/postgres_fdw--1.0--1.1.sql   |  10 ++
 contrib/postgres_fdw/postgres_fdw.control     |   2 +-
 contrib/postgres_fdw/sql/postgres_fdw.sql     |  27 ++++
 doc/src/sgml/postgres-fdw.sgml                |  32 +++++
 7 files changed, 266 insertions(+), 3 deletions(-)
 create mode 100644 contrib/postgres_fdw/postgres_fdw--1.0--1.1.sql

diff --git a/contrib/postgres_fdw/Makefile b/contrib/postgres_fdw/Makefile
index ee8a80a392..c1b0cad453 100644
--- a/contrib/postgres_fdw/Makefile
+++ b/contrib/postgres_fdw/Makefile
@@ -14,7 +14,7 @@ PG_CPPFLAGS = -I$(libpq_srcdir)
 SHLIB_LINK_INTERNAL = $(libpq)
 
 EXTENSION = postgres_fdw
-DATA = postgres_fdw--1.0.sql
+DATA = postgres_fdw--1.0.sql postgres_fdw--1.0--1.1.sql
 
 REGRESS = postgres_fdw
 
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index eaedfea9f2..a1404cb6bb 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -16,12 +16,14 @@
 #include "access/xact.h"
 #include "catalog/pg_user_mapping.h"
 #include "commands/defrem.h"
+#include "funcapi.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postgres_fdw.h"
 #include "storage/fd.h"
 #include "storage/latch.h"
+#include "utils/builtins.h"
 #include "utils/datetime.h"
 #include "utils/hsearch.h"
 #include "utils/inval.h"
@@ -74,6 +76,11 @@ static unsigned int prep_stmt_number = 0;
 /* tracks whether any work is needed in callback functions */
 static bool xact_got_connection = false;
 
+/*
+ * SQL functions
+ */
+PG_FUNCTION_INFO_V1(postgres_fdw_get_connections);
+
 /* prototypes of private functions */
 static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
 static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
@@ -1335,3 +1342,131 @@ exit:	;
 		*result = last_res;
 	return timed_out;
 }
+
+/*
+ * List active foreign server connections.
+ *
+ * This function takes no input parameter and returns setof record made of
+ * following values:
+ * - server_name - server name of active connection. In case the foreign server
+ *   is dropped but still the connection is active, then the server name will
+ *   be NULL in output.
+ * - valid - true/false representing whether the connection is valid or not.
+ * 	 Note that the connections can get invalidated in pgfdw_inval_callback.
+ *
+ * No records are returned when there are no cached connections at all.
+ */
+Datum
+postgres_fdw_get_connections(PG_FUNCTION_ARGS)
+{
+#define POSTGRES_FDW_GET_CONNECTIONS_COLS	2
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	TupleDesc	tupdesc;
+	Tuplestorestate *tupstore;
+	MemoryContext per_query_ctx;
+	MemoryContext oldcontext;
+	HASH_SEQ_STATUS scan;
+	ConnCacheEntry *entry;
+
+	/* check to see if caller supports us returning a tuplestore */
+	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("set-valued function called in context that cannot accept a set")));
+	if (!(rsinfo->allowedModes & SFRM_Materialize))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("materialize mode required, but it is not allowed in this context")));
+
+	/* Build a tuple descriptor for our result type */
+	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+		elog(ERROR, "return type must be a row type");
+
+	/* Build tuplestore to hold the result rows */
+	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+	oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+	tupstore = tuplestore_begin_heap(true, false, work_mem);
+	rsinfo->returnMode = SFRM_Materialize;
+	rsinfo->setResult = tupstore;
+	rsinfo->setDesc = tupdesc;
+
+	MemoryContextSwitchTo(oldcontext);
+
+	/* If cache doesn't exist, we return no records */
+	if (!ConnectionHash)
+	{
+		/* clean up and return the tuplestore */
+		tuplestore_donestoring(tupstore);
+
+		PG_RETURN_VOID();
+	}
+
+	hash_seq_init(&scan, ConnectionHash);
+	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+	{
+		ForeignServer *server;
+		Datum		values[POSTGRES_FDW_GET_CONNECTIONS_COLS];
+		bool		nulls[POSTGRES_FDW_GET_CONNECTIONS_COLS];
+
+		/* We only look for open remote connections */
+		if (!entry->conn)
+			continue;
+
+		server = GetForeignServerExtended(entry->serverid, FSV_MISSING_OK);
+
+		MemSet(values, 0, sizeof(values));
+		MemSet(nulls, 0, sizeof(nulls));
+
+		/*
+		 * The foreign server may have been dropped in current explicit
+		 * transaction. It is not possible to drop the server from another
+		 * session when the connection associated with it is in use in the
+		 * current transaction, if tried so, the drop query in another session
+		 * blocks until the current transaction finishes.
+		 *
+		 * Even though the server is dropped in the current transaction, the
+		 * cache can still have associated active connection entry, say we
+		 * call such connections dangling. Since we can not fetch the server
+		 * name from system catalogs for dangling connections, instead we
+		 * show NULL value for server name in output.
+		 *
+		 * We could have done better by storing the server name in the cache
+		 * entry instead of server oid so that it could be used in the output.
+		 * But the server name in each cache entry requires 64 bytes of
+		 * memory, which is huge, when there are many cached connections and
+		 * the use case i.e. dropping the foreign server within the explicit
+		 * current transaction seems rare. So, we chose to show NULL value for
+		 * server name in output.
+		 *
+		 * Such dangling connections get closed either in next use or at the
+		 * end of current explicit transaction in pgfdw_xact_callback.
+		 */
+		if (!server)
+		{
+			/*
+			 * If the server has been dropped in the current explicit
+			 * transaction, then this entry would have been invalidated in
+			 * pgfdw_inval_callback at the end of drop sever command. Note
+			 * that this connection would not have been closed in
+			 * pgfdw_inval_callback because it is still being used in the
+			 * current explicit transaction. So, assert that here.
+			 */
+			Assert(entry->conn && entry->xact_depth > 0 && entry->invalidated);
+
+			/* Show null, if no server name was found */
+			nulls[0] = true;
+		}
+		else
+			values[0] = CStringGetTextDatum(server->servername);
+
+		values[1] = BoolGetDatum(!entry->invalidated);
+
+		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+	}
+
+	/* clean up and return the tuplestore */
+	tuplestore_donestoring(tupstore);
+
+	PG_RETURN_VOID();
+}
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index c11092f8cc..1cad311436 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -13,12 +13,18 @@ DO $d$
             OPTIONS (dbname '$$||current_database()||$$',
                      port '$$||current_setting('port')||$$'
             )$$;
+        EXECUTE $$CREATE SERVER loopback3 FOREIGN DATA WRAPPER postgres_fdw
+            OPTIONS (dbname '$$||current_database()||$$',
+                     port '$$||current_setting('port')||$$'
+            )$$;
+
     END;
 $d$;
 CREATE USER MAPPING FOR public SERVER testserver1
 	OPTIONS (user 'value', password 'value');
 CREATE USER MAPPING FOR CURRENT_USER SERVER loopback;
 CREATE USER MAPPING FOR CURRENT_USER SERVER loopback2;
+CREATE USER MAPPING FOR public SERVER loopback3;
 -- ===================================================================
 -- create objects used through FDW loopback server
 -- ===================================================================
@@ -129,6 +135,11 @@ CREATE FOREIGN TABLE ft6 (
 	c2 int NOT NULL,
 	c3 text
 ) SERVER loopback2 OPTIONS (schema_name 'S 1', table_name 'T 4');
+CREATE FOREIGN TABLE ft7 (
+	c1 int NOT NULL,
+	c2 int NOT NULL,
+	c3 text
+) SERVER loopback3 OPTIONS (schema_name 'S 1', table_name 'T 4');
 -- ===================================================================
 -- tests for validator
 -- ===================================================================
@@ -199,7 +210,8 @@ ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (column_name 'C 1');
  public | ft4   | loopback  | (schema_name 'S 1', table_name 'T 3') | 
  public | ft5   | loopback  | (schema_name 'S 1', table_name 'T 4') | 
  public | ft6   | loopback2 | (schema_name 'S 1', table_name 'T 4') | 
-(5 rows)
+ public | ft7   | loopback3 | (schema_name 'S 1', table_name 'T 4') | 
+(6 rows)
 
 -- Test that alteration of server options causes reconnection
 -- Remote's errors might be non-English, so hide them to ensure stable results
@@ -9040,6 +9052,13 @@ DROP PROCEDURE terminate_backend_and_wait(text);
 -- ===================================================================
 -- This test case is for closing the connection in pgfdw_xact_callback
 BEGIN;
+-- List all the existing cached connections. Only loopback2 should be output.
+SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
+ server_name | valid 
+-------------+-------
+ loopback2   | t
+(1 row)
+
 -- Connection xact depth becomes 1 i.e. the connection is in midst of the xact.
 SELECT 1 FROM ft1 LIMIT 1;
  ?column? 
@@ -9047,9 +9066,49 @@ SELECT 1 FROM ft1 LIMIT 1;
         1
 (1 row)
 
+SELECT 1 FROM ft7 LIMIT 1;
+ ?column? 
+----------
+        1
+(1 row)
+
+-- List all the existing cached connections. loopback and loopback3
+-- also should be output as valid connections.
+SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
+ server_name | valid 
+-------------+-------
+ loopback    | t
+ loopback2   | t
+ loopback3   | t
+(3 rows)
+
 -- Connection is not closed at the end of the alter statement in
 -- pgfdw_inval_callback. That's because the connection is in midst of this
 -- xact, it is just marked as invalid.
 ALTER SERVER loopback OPTIONS (ADD use_remote_estimate 'off');
+DROP SERVER loopback3 CASCADE;
+NOTICE:  drop cascades to 2 other objects
+DETAIL:  drop cascades to user mapping for public on server loopback3
+drop cascades to foreign table ft7
+-- List all the existing cached connections. loopback and loopback3
+-- should be output as invalid connections. Also the server name for
+-- loopback3 should be NULL because the server was dropped.
+SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
+ server_name | valid 
+-------------+-------
+ loopback    | f
+ loopback2   | t
+             | f
+(3 rows)
+
 -- The invalid connection gets closed in pgfdw_xact_callback during commit.
 COMMIT;
+-- List all the existing cached connections. loopback and loopback3
+-- should not be output because they should be closed at the end of
+-- the above transaction.
+SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
+ server_name | valid 
+-------------+-------
+ loopback2   | t
+(1 row)
+
diff --git a/contrib/postgres_fdw/postgres_fdw--1.0--1.1.sql b/contrib/postgres_fdw/postgres_fdw--1.0--1.1.sql
new file mode 100644
index 0000000000..7f85784466
--- /dev/null
+++ b/contrib/postgres_fdw/postgres_fdw--1.0--1.1.sql
@@ -0,0 +1,10 @@
+/* contrib/postgres_fdw/postgres_fdw--1.0--1.1.sql */
+
+-- complain if script is sourced in psql, rather than via ALTER EXTENSION
+\echo Use "ALTER EXTENSION postgres_fdw UPDATE TO '1.1'" to load this file. \quit
+
+CREATE FUNCTION postgres_fdw_get_connections (OUT server_name text,
+    OUT valid boolean)
+RETURNS SETOF record
+AS 'MODULE_PATHNAME'
+LANGUAGE C STRICT PARALLEL RESTRICTED;
diff --git a/contrib/postgres_fdw/postgres_fdw.control b/contrib/postgres_fdw/postgres_fdw.control
index f9ed490752..d489382064 100644
--- a/contrib/postgres_fdw/postgres_fdw.control
+++ b/contrib/postgres_fdw/postgres_fdw.control
@@ -1,5 +1,5 @@
 # postgres_fdw extension
 comment = 'foreign-data wrapper for remote PostgreSQL servers'
-default_version = '1.0'
+default_version = '1.1'
 module_pathname = '$libdir/postgres_fdw'
 relocatable = true
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 25dbc08b98..ebf6eb10a6 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -15,6 +15,11 @@ DO $d$
             OPTIONS (dbname '$$||current_database()||$$',
                      port '$$||current_setting('port')||$$'
             )$$;
+        EXECUTE $$CREATE SERVER loopback3 FOREIGN DATA WRAPPER postgres_fdw
+            OPTIONS (dbname '$$||current_database()||$$',
+                     port '$$||current_setting('port')||$$'
+            )$$;
+
     END;
 $d$;
 
@@ -22,6 +27,7 @@ CREATE USER MAPPING FOR public SERVER testserver1
 	OPTIONS (user 'value', password 'value');
 CREATE USER MAPPING FOR CURRENT_USER SERVER loopback;
 CREATE USER MAPPING FOR CURRENT_USER SERVER loopback2;
+CREATE USER MAPPING FOR public SERVER loopback3;
 
 -- ===================================================================
 -- create objects used through FDW loopback server
@@ -142,6 +148,12 @@ CREATE FOREIGN TABLE ft6 (
 	c3 text
 ) SERVER loopback2 OPTIONS (schema_name 'S 1', table_name 'T 4');
 
+CREATE FOREIGN TABLE ft7 (
+	c1 int NOT NULL,
+	c2 int NOT NULL,
+	c3 text
+) SERVER loopback3 OPTIONS (schema_name 'S 1', table_name 'T 4');
+
 -- ===================================================================
 -- tests for validator
 -- ===================================================================
@@ -2703,11 +2715,26 @@ DROP PROCEDURE terminate_backend_and_wait(text);
 -- ===================================================================
 -- This test case is for closing the connection in pgfdw_xact_callback
 BEGIN;
+-- List all the existing cached connections. Only loopback2 should be output.
+SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
 -- Connection xact depth becomes 1 i.e. the connection is in midst of the xact.
 SELECT 1 FROM ft1 LIMIT 1;
+SELECT 1 FROM ft7 LIMIT 1;
+-- List all the existing cached connections. loopback and loopback3
+-- also should be output as valid connections.
+SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
 -- Connection is not closed at the end of the alter statement in
 -- pgfdw_inval_callback. That's because the connection is in midst of this
 -- xact, it is just marked as invalid.
 ALTER SERVER loopback OPTIONS (ADD use_remote_estimate 'off');
+DROP SERVER loopback3 CASCADE;
+-- List all the existing cached connections. loopback and loopback3
+-- should be output as invalid connections. Also the server name for
+-- loopback3 should be NULL because the server was dropped.
+SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
 -- The invalid connection gets closed in pgfdw_xact_callback during commit.
 COMMIT;
+-- List all the existing cached connections. loopback and loopback3
+-- should not be output because they should be closed at the end of
+-- the above transaction.
+SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
diff --git a/doc/src/sgml/postgres-fdw.sgml b/doc/src/sgml/postgres-fdw.sgml
index e6fd2143c1..6a91926da8 100644
--- a/doc/src/sgml/postgres-fdw.sgml
+++ b/doc/src/sgml/postgres-fdw.sgml
@@ -479,6 +479,38 @@ OPTIONS (ADD password_required 'false');
   </sect3>
  </sect2>
 
+<sect2>
+  <title>Functions</title>
+
+  <variablelist>
+   <varlistentry>
+    <term><function>postgres_fdw_get_connections(OUT server_name text, OUT valid boolean) returns setof record</function></term>
+    <listitem>
+     <para>
+      This function returns the foreign server names of all the open
+      connections that <filename>postgres_fdw</filename> established from
+      the local session to the foreign servers. It also returns whether
+      each connection is valid or not. <literal>false</literal> is returned
+      if the foreign server connection is used in the current local
+      transaction but its foreign server or user mapping is changed or
+      dropped, and then such invalid connection will be closed at
+      the end of that transaction. <literal>true</literal> is returned
+      otherwise. If there are no open connections, no record is returned.
+      Example usage of the function:
+    <screen>
+postgres=# SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
+ server_name | valid 
+-------------+-------
+ loopback1   | t
+ loopback2   | f
+</screen>
+     </para>
+    </listitem>
+   </varlistentry>
+   </variablelist>
+
+</sect2>
+
  <sect2>
   <title>Connection Management</title>
 
-- 
2.25.1

