From f39bf612fea8405b8b1a65cf99a16d65a2369109 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddy@enterprisedb.com>
Date: Thu, 24 Dec 2020 19:56:13 +0530
Subject: [PATCH v3] postgres_fdw - cached connection leaks if the associated
 user mapping is dropped

In postgres_fdw the cached connections to remote servers can stay
until the lifetime of the local session, if the underlying user
mapping is dropped in another session.

To solve the above connection leak problem, it looks like the right
place to close the invalid connections is either in pgfdw_inval_callback()
if they are not in midst any xact, or in pgfdw_xact_callback(), which gets
called at the end of every act once registered, in the
current session(by then all the sub xacts also would have been finished).
Note that if there are too many invalidated entries, then the
following xact has to bear running this extra disconnect code, but
that's okay than having leaked connections.
---
 contrib/postgres_fdw/connection.c             | 33 +++++++++++++++----
 .../postgres_fdw/expected/postgres_fdw.out    | 18 ++++++++++
 contrib/postgres_fdw/sql/postgres_fdw.sql     | 14 ++++++++
 3 files changed, 58 insertions(+), 7 deletions(-)

diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 66581e5414..d841cec39b 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -940,12 +940,14 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 		entry->xact_depth = 0;
 
 		/*
-		 * If the connection isn't in a good idle state, discard it to
-		 * recover. Next GetConnection will open a new connection.
+		 * If the connection isn't in a good idle state or it is marked as
+		 * invalid, then discard it to recover. Next GetConnection will open a
+		 * new connection.
 		 */
 		if (PQstatus(entry->conn) != CONNECTION_OK ||
 			PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
-			entry->changing_xact_state)
+			entry->changing_xact_state ||
+			entry->invalidated)
 		{
 			elog(DEBUG3, "discarding connection %p", entry->conn);
 			disconnect_pg_server(entry);
@@ -1069,9 +1071,12 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
  * Connection invalidation callback function
  *
  * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
- * mark connections depending on that entry as needing to be remade.
- * We can't immediately destroy them, since they might be in the midst of
- * a transaction, but we'll remake them at the next opportunity.
+ * close connections depending on that entry immediately if current transaction
+ * has not used those connections yet. Otherwise, mark those connections as
+ * invalid and then make pgfdw_xact_callback() close them at the end of current
+ * transaction, since they cannot be closed in the midst of the transaction
+ * using them. Closed connections will be remade at the next opportunity if
+ * necessary.
  *
  * Although most cache invalidation callbacks blow away all the related stuff
  * regardless of the given hashvalue, connections are expensive enough that
@@ -1102,7 +1107,21 @@ pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
 			 entry->server_hashvalue == hashvalue) ||
 			(cacheid == USERMAPPINGOID &&
 			 entry->mapping_hashvalue == hashvalue))
-			entry->invalidated = true;
+		{
+			/*
+			 * Close the connection immediately if it's not used yet in this
+			 * transaction. Otherwise mark it as invalid so that
+			 * pgfdw_xact_callback() can close it at the end of this
+			 * transaction.
+			 */
+			if (entry->xact_depth == 0)
+			{
+				elog(DEBUG3, "discarding connection %p", entry->conn);
+				disconnect_pg_server(entry);
+			}
+			else
+				entry->invalidated = true;
+		}
 	}
 }
 
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 2d88d06358..c11092f8cc 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -9035,3 +9035,21 @@ ERROR:  08006
 COMMIT;
 -- Clean up
 DROP PROCEDURE terminate_backend_and_wait(text);
+-- ===================================================================
+-- test connection invalidation cases
+-- ===================================================================
+-- This test case is for closing the connection in pgfdw_xact_callback
+BEGIN;
+-- Connection xact depth becomes 1 i.e. the connection is in midst of the xact.
+SELECT 1 FROM ft1 LIMIT 1;
+ ?column? 
+----------
+        1
+(1 row)
+
+-- Connection is not closed at the end of the alter statement in
+-- pgfdw_inval_callback. That's because the connection is in midst of this
+-- xact, it is just marked as invalid.
+ALTER SERVER loopback OPTIONS (ADD use_remote_estimate 'off');
+-- The invalid connection gets closed in pgfdw_xact_callback during commit.
+COMMIT;
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 7581c5417b..25dbc08b98 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -2697,3 +2697,17 @@ COMMIT;
 
 -- Clean up
 DROP PROCEDURE terminate_backend_and_wait(text);
+
+-- ===================================================================
+-- test connection invalidation cases
+-- ===================================================================
+-- This test case is for closing the connection in pgfdw_xact_callback
+BEGIN;
+-- Connection xact depth becomes 1 i.e. the connection is in midst of the xact.
+SELECT 1 FROM ft1 LIMIT 1;
+-- Connection is not closed at the end of the alter statement in
+-- pgfdw_inval_callback. That's because the connection is in midst of this
+-- xact, it is just marked as invalid.
+ALTER SERVER loopback OPTIONS (ADD use_remote_estimate 'off');
+-- The invalid connection gets closed in pgfdw_xact_callback during commit.
+COMMIT;
-- 
2.25.1

