From 246ee4a63daafaba1bcf462bd1b72fd4406653ad Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Fri, 27 Jan 2023 03:17:30 +0000
Subject: [PATCH v40 1/3] postgres_fdw: add postgres_fdw_verify_connection
 variants

This function can verify the status of connections that are establieshed by
postgres_fdw. This check wil be done by polling the socket. This feature is
currently available only on systems that support the non-standard POLLRDHUP
extension to the poll system call, including Linux.

They return true if existing connection is not closed by the remote peer.
---
 contrib/postgres_fdw/Makefile                 |   2 +-
 contrib/postgres_fdw/connection.c             | 271 ++++++++++++++++++
 contrib/postgres_fdw/meson.build              |   1 +
 .../postgres_fdw/postgres_fdw--1.1--1.2.sql   |  25 ++
 contrib/postgres_fdw/postgres_fdw.control     |   2 +-
 doc/src/sgml/postgres-fdw.sgml                |  77 +++++
 src/backend/foreign/foreign.c                 |  28 ++
 src/include/foreign/foreign.h                 |   1 +
 8 files changed, 405 insertions(+), 2 deletions(-)
 create mode 100644 contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql

diff --git a/contrib/postgres_fdw/Makefile b/contrib/postgres_fdw/Makefile
index c1b0cad453..6d23768389 100644
--- a/contrib/postgres_fdw/Makefile
+++ b/contrib/postgres_fdw/Makefile
@@ -14,7 +14,7 @@ PG_CPPFLAGS = -I$(libpq_srcdir)
 SHLIB_LINK_INTERNAL = $(libpq)
 
 EXTENSION = postgres_fdw
-DATA = postgres_fdw--1.0.sql postgres_fdw--1.0--1.1.sql
+DATA = postgres_fdw--1.0.sql postgres_fdw--1.0--1.1.sql postgres_fdw--1.1--1.2.sql
 
 REGRESS = postgres_fdw
 
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 5800c6a9fb..ad1fd00289 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -12,6 +12,10 @@
  */
 #include "postgres.h"
 
+#if HAVE_POLL_H
+#include <poll.h>
+#endif
+
 #include "access/htup_details.h"
 #include "access/xact.h"
 #include "catalog/pg_user_mapping.h"
@@ -25,6 +29,7 @@
 #include "postgres_fdw.h"
 #include "storage/fd.h"
 #include "storage/latch.h"
+#include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/datetime.h"
 #include "utils/hsearch.h"
@@ -113,6 +118,10 @@ static uint32 pgfdw_we_get_result = 0;
 PG_FUNCTION_INFO_V1(postgres_fdw_get_connections);
 PG_FUNCTION_INFO_V1(postgres_fdw_disconnect);
 PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all);
+PG_FUNCTION_INFO_V1(postgres_fdw_verify_connection_server_user);
+PG_FUNCTION_INFO_V1(postgres_fdw_verify_connection_server);
+PG_FUNCTION_INFO_V1(postgres_fdw_verify_connection_all);
+PG_FUNCTION_INFO_V1(postgres_fdw_can_verify_connection);
 
 /* prototypes of private functions */
 static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
@@ -159,6 +168,13 @@ static void pgfdw_security_check(const char **keywords, const char **values,
 								 UserMapping *user, PGconn *conn);
 static bool UserMappingPasswordRequired(UserMapping *user);
 static bool disconnect_cached_connections(Oid serverid);
+static Datum postgres_fdw_verify_connection(FunctionCallInfo fcinfo);
+static bool verify_cached_connections(Oid serverid, Oid userid,
+									  bool *checked);
+
+/* Low layer-like functions. They are used for verifying connections. */
+static int pgfdw_conn_check(PGconn *conn);
+static bool pgfdw_conn_checkable(void);
 
 /*
  * Get a PGconn which can be used to execute queries on the remote PostgreSQL
@@ -2241,3 +2257,258 @@ disconnect_cached_connections(Oid serverid)
 
 	return result;
 }
+
+/*
+ * Workhorse to verify cached connections.
+ *
+ * This function scans all the connection cache entries and verifies the
+ * connections whose foreign server OID and user mapping OID matche with the
+ * specified one. If userid is specified as InvalidOid, it verifies cached
+ * connections which have arbitrary user mapping OID. If serverid is specified
+ * as InvalidOid, it verifies all the cached connections.
+ *
+ * This function emits warnings if a disconnection is found. This returns false
+ * if disconnections are found, otherwise returns true.
+ *
+ * checked will be set to true if pgfdw_conn_check() is called at least once.
+ */
+static bool
+verify_cached_connections(Oid serverid, Oid userid, bool *checked)
+{
+	HASH_SEQ_STATUS		scan;
+	ConnCacheEntry	   *entry;
+	bool				all = !OidIsValid(serverid);
+	bool 				check_user_mapping = OidIsValid(userid);
+	bool				result = true;
+	StringInfoData 		str;
+
+	Assert(ConnectionHash);
+
+	*checked = false;
+
+	hash_seq_init(&scan, ConnectionHash);
+	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+	{
+		/* Ignore cache entry if no open connection right now */
+		if (!entry->conn)
+			continue;
+
+		/* Skip if the entry is invalidated */
+		if (entry->invalidated)
+			continue;
+
+		if (all || entry->serverid == serverid)
+		{
+			/* Skip if the given userid is different from the key */
+			if (!all && check_user_mapping && (entry->key != userid))
+				continue;
+
+			if (pgfdw_conn_check(entry->conn))
+			{
+				/* A foreign server might be down, so construct a message */
+				ForeignServer *server = GetForeignServer(entry->serverid);
+				UserMapping	  *user = GetUserMappingFromOid(entry->key);
+
+				if (result)
+				{
+					/*
+					 * Initialize and add a prefix if this is the first
+					 * disconnection we found.
+					 */
+					initStringInfo(&str);
+					appendStringInfo(&str, "could not connect to server ");
+
+					result = false;
+				}
+				else
+					appendStringInfo(&str, ", ");
+
+				appendStringInfo(&str, "\"%s\" for user \"%s\"",
+								 server->servername,
+								 MappingUserName(user->userid));
+			}
+
+			/* Set a flag to notify the caller */
+			*checked = true;
+		}
+	}
+
+	/* Raise a warning if disconnections are found */
+	if (!result)
+	{
+		Assert(str.len);
+		ereport(WARNING,
+				errcode(ERRCODE_CONNECTION_FAILURE),
+				errmsg("%s", str.data),
+				errdetail("Connection close is detected."),
+				errhint("Plsease check the health of server."));
+		pfree(str.data);
+	}
+
+	return result;
+}
+
+/*
+ * Internal function for postgres_fdw_verify_connection variants
+ *
+ * This function verifies the connections that are established by postgres_fdw
+ * from the local session to the foreign server with the given name. If
+ * username is given, verifications are done only for foreign servers which is
+ * mapped by the user.
+ *
+ * This function emits a warning if a disconnection is found. This returns true
+ * if existing connection is not closed by the remote peer. false is returned
+ * if the local session seems to be disconnected from other servers. NULL is
+ * returned if a valid connection to the specified foreign server is not
+ * established or this function is not available on this platform.
+ */
+static Datum
+postgres_fdw_verify_connection(FunctionCallInfo fcinfo)
+{
+	ForeignServer  *server = NULL;
+	UserMapping	   *user = NULL;
+	bool			result,
+					checked;
+
+	/* Quick exit if the checking does not work well on this platfrom */
+	if (!pgfdw_conn_checkable())
+		PG_RETURN_NULL();
+
+	/* Quick exit if connection cache has not been initialized yet */
+	if (!ConnectionHash)
+		PG_RETURN_NULL();
+
+	/* If server name is specified, find a foreign server */
+	if (PG_NARGS() >= 1)
+	{
+		char *servername = text_to_cstring(PG_GETARG_TEXT_PP(0));
+
+		server = GetForeignServerByName(servername, false);
+	}
+
+	/* If user name is specified, find a user mapping */
+	if (PG_NARGS() >= 2)
+	{
+		char *username = text_to_cstring(PG_GETARG_TEXT_PP(1));
+		Oid userid = get_role_oid(username, false);
+
+		user = GetUserMapping(userid, server->serverid);
+	}
+
+	result = verify_cached_connections(server ? server->serverid : InvalidOid,
+									   user ? user->umid : InvalidOid,
+									   &checked);
+
+	if (checked)
+		PG_RETURN_BOOL(result);
+	else
+		PG_RETURN_NULL();
+}
+
+/*
+ * Verify all the cached connections.
+ *
+ * This function verifies all the connections that are established by postgres_fdw
+ * from the local session to the foreign servers.
+ */
+Datum
+postgres_fdw_verify_connection_all(PG_FUNCTION_ARGS)
+{
+	return postgres_fdw_verify_connection(fcinfo);
+}
+
+/*
+ * postgres_fdw_verify_connection variants
+ *
+ * They are all named 'postgres_fdw_verify_connection' at the SQL level.
+ * They take combinations of server name and user name.
+ */
+
+/*
+ * This function passes both server name and user name to
+ * postgres_fdw_verify_connection().
+ */
+Datum
+postgres_fdw_verify_connection_server_user(PG_FUNCTION_ARGS)
+{
+	return postgres_fdw_verify_connection(fcinfo);
+}
+
+/*
+ * This function passes only server name to postgres_fdw_verify_connection().
+ * This means that the internal function does not care about the difference of
+ * local user.
+ */
+Datum
+postgres_fdw_verify_connection_server(PG_FUNCTION_ARGS)
+{
+	return postgres_fdw_verify_connection(fcinfo);
+}
+
+/*
+ * Check whether functions for verifying cached connections work well or not
+ */
+Datum
+postgres_fdw_can_verify_connection(PG_FUNCTION_ARGS)
+{
+	PG_RETURN_BOOL(pgfdw_conn_checkable());
+}
+
+/*
+ * Check whether the socket peer closed the connection or not.
+ *
+ * Returns >0 if input connection is bad or remote peer seems to be closed,
+ * 0 if it is valid, and -1 if an error occurred.
+ */
+static int
+pgfdw_conn_check(PGconn *conn)
+{
+	int sock = PQsocket(conn);
+	if (!pgfdw_conn_checkable())
+		return 0;
+
+	if (!conn || PQstatus(conn) != CONNECTION_OK || sock == PGINVALID_SOCKET)
+		return -1;
+
+#if (defined(HAVE_POLL) && defined(POLLRDHUP))
+	{
+		/*
+		 * This platform seems to have poll(2), and can wait POLLRDHUP event.
+		 * So construct pollfd and directly call it.
+		 */
+		struct pollfd input_fd;
+		int result;
+
+		input_fd.fd = sock;
+		input_fd.events = POLLRDHUP;
+		input_fd.revents = 0;
+
+		do
+			result = poll(&input_fd, 1, 0);
+		while (result < 0 && errno == EINTR);
+
+		if (result < 0)
+			return -1;
+
+		return input_fd.revents;
+	}
+#else
+	/* Do not support socket checking on this platform, return 0 */
+	return 0;
+#endif
+}
+
+/*
+ * Check whether pgfdw_conn_check() can work on this platform.
+ *
+ * Returns true if this can use pgfdw_conn_check(), otherwise false.
+ */
+static bool
+pgfdw_conn_checkable(void)
+{
+#if (defined(HAVE_POLL) && defined(POLLRDHUP))
+	return true;
+#else
+	return false;
+#endif
+}
diff --git a/contrib/postgres_fdw/meson.build b/contrib/postgres_fdw/meson.build
index 2b451f165e..29118d47bb 100644
--- a/contrib/postgres_fdw/meson.build
+++ b/contrib/postgres_fdw/meson.build
@@ -26,6 +26,7 @@ install_data(
   'postgres_fdw.control',
   'postgres_fdw--1.0.sql',
   'postgres_fdw--1.0--1.1.sql',
+  'postgres_fdw--1.1--1.2.sql',
   kwargs: contrib_data_args,
 )
 
diff --git a/contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql b/contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql
new file mode 100644
index 0000000000..78ee82cc74
--- /dev/null
+++ b/contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql
@@ -0,0 +1,25 @@
+/* contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql */
+
+-- complain if script is sourced in psql, rather than via ALTER EXTENSION
+\echo Use "ALTER EXTENSION postgres_fdw UPDATE TO '1.2'" to load this file. \quit
+
+CREATE FUNCTION postgres_fdw_verify_connection (IN server_name text,
+        IN user_name text)
+RETURNS bool
+AS 'MODULE_PATHNAME', 'postgres_fdw_verify_connection_server_user'
+LANGUAGE C STRICT PARALLEL RESTRICTED;
+
+CREATE FUNCTION postgres_fdw_verify_connection (IN server_name text)
+RETURNS bool
+AS 'MODULE_PATHNAME', 'postgres_fdw_verify_connection_server'
+LANGUAGE C STRICT PARALLEL RESTRICTED;
+
+CREATE FUNCTION postgres_fdw_verify_connection_all ()
+RETURNS bool
+AS 'MODULE_PATHNAME'
+LANGUAGE C STRICT PARALLEL RESTRICTED;
+
+CREATE FUNCTION postgres_fdw_can_verify_connection ()
+RETURNS bool
+AS 'MODULE_PATHNAME'
+LANGUAGE C STRICT PARALLEL SAFE;
diff --git a/contrib/postgres_fdw/postgres_fdw.control b/contrib/postgres_fdw/postgres_fdw.control
index d489382064..a4b800be4f 100644
--- a/contrib/postgres_fdw/postgres_fdw.control
+++ b/contrib/postgres_fdw/postgres_fdw.control
@@ -1,5 +1,5 @@
 # postgres_fdw extension
 comment = 'foreign-data wrapper for remote PostgreSQL servers'
-default_version = '1.1'
+default_version = '1.2'
 module_pathname = '$libdir/postgres_fdw'
 relocatable = true
diff --git a/doc/src/sgml/postgres-fdw.sgml b/doc/src/sgml/postgres-fdw.sgml
index 2ba96aa295..a01ad80c53 100644
--- a/doc/src/sgml/postgres-fdw.sgml
+++ b/doc/src/sgml/postgres-fdw.sgml
@@ -847,6 +847,83 @@ postgres=# SELECT postgres_fdw_disconnect_all();
      </para>
     </listitem>
    </varlistentry>
+
+   <varlistentry>
+    <term><function>postgres_fdw_verify_connection(server_name text <optional>, user_name text </optional>) returns boolean</function></term>
+    <listitem>
+     <para>
+      This function checks the status of remote connections established by
+      <filename>postgres_fdw</filename> from the local session to the foreign
+      server with the given name. If the user name is given, This function
+      checks only remote connections which is associated with the given user by
+      user mapping. Otherwise all the connections with given server name are
+      checked. This check is performed by polling the socket and allows
+      long-running transactions to be aborted sooner if the kernel reports that
+      the connection is closed. This function is currently available only on
+      systems that support the non-standard <symbol>POLLRDHUP</symbol>
+      extension to the <symbol>poll</symbol> system call, including Linux. This
+      returns <literal>true</literal> if existing connection is not closed by
+      the remote peer. <literal>false</literal> is returned if either of
+      checked connections has been closed. <literal>NULL</literal> is returned
+      if a valid connection to the specified foreign server is not established
+      or this function is not available on this platform. If no foreign server
+      with the given name is found, an error is reported.
+      Example usage of the function:
+<screen>
+postgres=# SELECT postgres_fdw_verify_connection('loopback1');
+ postgres_fdw_verify_connection
+--------------------------------
+ t
+</screen>
+     </para>
+    </listitem>
+   </varlistentry>
+   <varlistentry>
+    <term><function>postgres_fdw_verify_connection_all() returns boolean</function></term>
+    <listitem>
+     <para>
+      This function checks the status of all the remote connections established
+      by <filename>postgres_fdw</filename> from the local session to the
+      foreign servers. This check is performed by polling the socket and allows
+      long-running transactions to be aborted sooner if the kernel reports
+      that the connection is closed. This function is currently available only
+      on systems that support the non-standard <symbol>POLLRDHUP</symbol>
+      extension to the <symbol>poll</symbol> system call, including Linux. This
+      returns <literal>true</literal> if all connections are not closed by the
+      remote peer. <literal>false</literal> is returned if the local session
+      seems to be disconnected from at least one remote server. <literal>NULL</literal>
+      is returned if no valid connections are established or this function is
+      not available on this platform. Example usage of the function:
+<screen>
+postgres=# SELECT postgres_fdw_verify_connection_all();
+ postgres_fdw_verify_connection_all
+------------------------------------
+ t
+</screen>
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term><function>postgres_fdw_can_verify_connection_states() returns boolean</function></term>
+    <listitem>
+     <para>
+      This function checks whether <function>postgres_fdw_verify_connection</function>
+      and <function>postgres_fdw_verify_connection_all</function> work
+      well or not. This returns <literal>true</literal> if it can be used,
+      otherwise returns <literal>false</literal>. Example usage of the
+      function:
+
+<screen>
+postgres=# SELECT postgres_fdw_can_verify_connection_states();
+ postgres_fdw_can_verify_connection_states
+-------------------------------------------
+ t
+</screen>
+     </para>
+    </listitem>
+   </varlistentry>
+
    </variablelist>
 
 </sect2>
diff --git a/src/backend/foreign/foreign.c b/src/backend/foreign/foreign.c
index fc3edef2a8..8a5cd80732 100644
--- a/src/backend/foreign/foreign.c
+++ b/src/backend/foreign/foreign.c
@@ -246,6 +246,34 @@ GetUserMapping(Oid userid, Oid serverid)
 	return um;
 }
 
+/*
+ * GetUserMappingFromOid - look up the user mapping by its oid.
+ */
+UserMapping *
+GetUserMappingFromOid(Oid usermappigid)
+{
+	HeapTuple				tp;
+	UserMapping			   *um;
+	Form_pg_user_mapping	umform;
+
+	tp = SearchSysCache1(USERMAPPINGOID,
+						 ObjectIdGetDatum(usermappigid));
+
+	if (!HeapTupleIsValid(tp))
+		elog(ERROR, "cache lookup failed for user mapping %u", usermappigid);
+
+	umform = ((Form_pg_user_mapping) GETSTRUCT(tp));
+
+	um = (UserMapping *) palloc(sizeof(UserMapping));
+	um->umid = umform->oid;
+	um->userid = umform->umuser;
+	um->serverid = umform->umserver;
+
+	ReleaseSysCache(tp);
+
+	return um;
+}
+
 
 /*
  * GetForeignTable - look up the foreign table definition by relation oid.
diff --git a/src/include/foreign/foreign.h b/src/include/foreign/foreign.h
index 5256d4d91f..25c9d40699 100644
--- a/src/include/foreign/foreign.h
+++ b/src/include/foreign/foreign.h
@@ -70,6 +70,7 @@ extern ForeignServer *GetForeignServerExtended(Oid serverid,
 extern ForeignServer *GetForeignServerByName(const char *srvname,
 											 bool missing_ok);
 extern UserMapping *GetUserMapping(Oid userid, Oid serverid);
+extern UserMapping *GetUserMappingFromOid(Oid usermappigid);
 extern ForeignDataWrapper *GetForeignDataWrapper(Oid fdwid);
 extern ForeignDataWrapper *GetForeignDataWrapperExtended(Oid fdwid,
 														 bits16 flags);
-- 
2.27.0

