On Sat, May 6, 2017 at 12:55 PM, Robert Haas <robertmh...@gmail.com> wrote:
> Oh!  Good catch.  Given that the behavior in question is intentional
> there and intended to provide backward compatibility, changing it in
> back-branches doesn't seem like a good plan.  I'll adjust the patch so
> that it continues to ignore errors in that case.

Updated patch attached.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index c6e3d44..2243ff8 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -14,6 +14,8 @@
 
 #include "postgres_fdw.h"
 
+#include "access/htup_details.h"
+#include "catalog/pg_user_mapping.h"
 #include "access/xact.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
@@ -21,6 +23,7 @@
 #include "storage/latch.h"
 #include "utils/hsearch.h"
 #include "utils/memutils.h"
+#include "utils/syscache.h"
 
 
 /*
@@ -49,6 +52,7 @@ typedef struct ConnCacheEntry
 								 * one level of subxact open, etc */
 	bool		have_prep_stmt; /* have we prepared any stmts in this xact? */
 	bool		have_error;		/* have any subxacts aborted in this xact? */
+	bool		abort_cleanup_incomplete;		/* (sub)abort cleanup pending */
 } ConnCacheEntry;
 
 /*
@@ -74,6 +78,12 @@ static void pgfdw_subxact_callback(SubXactEvent event,
 					   SubTransactionId mySubid,
 					   SubTransactionId parentSubid,
 					   void *arg);
+static void pgfdw_reject_if_abort_cleanup_incomplete(ConnCacheEntry *entry);
+static bool pgfdw_cancel_query(PGconn *conn);
+static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
+						 bool ignore_errors);
+static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
+						 PGresult **result);
 
 
 /*
@@ -139,8 +149,12 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
 		entry->xact_depth = 0;
 		entry->have_prep_stmt = false;
 		entry->have_error = false;
+		entry->abort_cleanup_incomplete = false;
 	}
 
+	/* Reject further use of connections which failed abort cleanup. */
+	pgfdw_reject_if_abort_cleanup_incomplete(entry);
+
 	/*
 	 * We don't check the health of cached connection here, because it would
 	 * require some overhead.  Broken connection will be detected when the
@@ -604,6 +618,8 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 		/* If it has an open remote transaction, try to close it */
 		if (entry->xact_depth > 0)
 		{
+			bool		abort_cleanup_failure = false;
+
 			elog(DEBUG3, "closing remote transaction on connection %p",
 				 entry->conn);
 
@@ -611,6 +627,13 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 			{
 				case XACT_EVENT_PARALLEL_PRE_COMMIT:
 				case XACT_EVENT_PRE_COMMIT:
+
+					/*
+					 * If abort cleanup previously failed for this connection,
+					 * we can't issue any more commands against it.
+					 */
+					pgfdw_reject_if_abort_cleanup_incomplete(entry);
+
 					/* Commit all remote transactions during pre-commit */
 					do_sql_command(entry->conn, "COMMIT TRANSACTION");
 
@@ -660,6 +683,24 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 					break;
 				case XACT_EVENT_PARALLEL_ABORT:
 				case XACT_EVENT_ABORT:
+
+					/*
+					 * Don't try to clean up the connection if we're already
+					 * in error recursion trouble.
+					 */
+					if (in_error_recursion_trouble())
+						entry->abort_cleanup_incomplete = true;
+
+					/*
+					 * If connection is already unsalvageable, don't touch it
+					 * further.
+					 */
+					if (entry->abort_cleanup_incomplete)
+						break;
+
+					/* Remember that abort cleanup is in progress. */
+					entry->abort_cleanup_incomplete = true;
+
 					/* Assume we might have lost track of prepared statements */
 					entry->have_error = true;
 
@@ -670,40 +711,35 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 					 * command is still being processed by the remote server,
 					 * and if so, request cancellation of the command.
 					 */
-					if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
+					if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
+						!pgfdw_cancel_query(entry->conn))
 					{
-						PGcancel   *cancel;
-						char		errbuf[256];
-
-						if ((cancel = PQgetCancel(entry->conn)))
-						{
-							if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
-								ereport(WARNING,
-										(errcode(ERRCODE_CONNECTION_FAILURE),
-								  errmsg("could not send cancel request: %s",
-										 errbuf)));
-							PQfreeCancel(cancel);
-						}
+						/* Unable to cancel running query. */
+						abort_cleanup_failure = true;
+					}
+					else if (!pgfdw_exec_cleanup_query(entry->conn,
+													   "ABORT TRANSACTION",
+													   false))
+					{
+						/* Unable to abort remote transaction. */
+						abort_cleanup_failure = true;
+					}
+					else if (entry->have_prep_stmt && entry->have_error &&
+							 !pgfdw_exec_cleanup_query(entry->conn,
+													   "DEALLOCATE ALL",
+													   true))
+					{
+						/* Trouble clearing prepared statements. */
+						abort_cleanup_failure = true;
 					}
-
-					/* If we're aborting, abort all remote transactions too */
-					res = PQexec(entry->conn, "ABORT TRANSACTION");
-					/* Note: can't throw ERROR, it would be infinite loop */
-					if (PQresultStatus(res) != PGRES_COMMAND_OK)
-						pgfdw_report_error(WARNING, res, entry->conn, true,
-										   "ABORT TRANSACTION");
 					else
 					{
-						PQclear(res);
-						/* As above, make sure to clear any prepared stmts */
-						if (entry->have_prep_stmt && entry->have_error)
-						{
-							res = PQexec(entry->conn, "DEALLOCATE ALL");
-							PQclear(res);
-						}
 						entry->have_prep_stmt = false;
 						entry->have_error = false;
 					}
+
+					/* Disarm abort_cleanup_incomplete if it all worked. */
+					entry->abort_cleanup_incomplete = abort_cleanup_failure;
 					break;
 			}
 		}
@@ -716,11 +752,13 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 		 * recover. Next GetConnection will open a new connection.
 		 */
 		if (PQstatus(entry->conn) != CONNECTION_OK ||
-			PQtransactionStatus(entry->conn) != PQTRANS_IDLE)
+			PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
+			entry->abort_cleanup_incomplete)
 		{
 			elog(DEBUG3, "discarding connection %p", entry->conn);
 			PQfinish(entry->conn);
 			entry->conn = NULL;
+			entry->abort_cleanup_incomplete = false;
 		}
 	}
 
@@ -763,7 +801,6 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
 	hash_seq_init(&scan, ConnectionHash);
 	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
 	{
-		PGresult   *res;
 		char		sql[100];
 
 		/*
@@ -779,12 +816,31 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
 
 		if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
 		{
+			/*
+			 * If abort cleanup previously failed for this connection, we
+			 * can't issue any more commands against it.
+			 */
+			pgfdw_reject_if_abort_cleanup_incomplete(entry);
+
 			/* Commit all remote subtransactions during pre-commit */
 			snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
 			do_sql_command(entry->conn, sql);
 		}
-		else
+		else if (in_error_recursion_trouble())
+		{
+			/*
+			 * Don't try to clean up the connection if we're already in error
+			 * recursion trouble.
+			 */
+			entry->abort_cleanup_incomplete = true;
+		}
+		else if (!entry->abort_cleanup_incomplete)
 		{
+			bool		abort_cleanup_failure = false;
+
+			/* Remember that abort cleanup is in progress. */
+			entry->abort_cleanup_incomplete = true;
+
 			/* Assume we might have lost track of prepared statements */
 			entry->have_error = true;
 
@@ -795,34 +851,220 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
 			 * processed by the remote server, and if so, request cancellation
 			 * of the command.
 			 */
-			if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
+			if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
+				!pgfdw_cancel_query(entry->conn))
+				abort_cleanup_failure = true;
+			else
 			{
-				PGcancel   *cancel;
-				char		errbuf[256];
-
-				if ((cancel = PQgetCancel(entry->conn)))
-				{
-					if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
-						ereport(WARNING,
-								(errcode(ERRCODE_CONNECTION_FAILURE),
-								 errmsg("could not send cancel request: %s",
-										errbuf)));
-					PQfreeCancel(cancel);
-				}
+				/* Rollback all remote subtransactions during abort */
+				snprintf(sql, sizeof(sql),
+						 "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
+						 curlevel, curlevel);
+				if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
+					abort_cleanup_failure = true;
 			}
 
-			/* Rollback all remote subtransactions during abort */
-			snprintf(sql, sizeof(sql),
-					 "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
-					 curlevel, curlevel);
-			res = PQexec(entry->conn, sql);
-			if (PQresultStatus(res) != PGRES_COMMAND_OK)
-				pgfdw_report_error(WARNING, res, entry->conn, true, sql);
-			else
-				PQclear(res);
+			/* Disarm abort_cleanup_incomplete if it all worked. */
+			entry->abort_cleanup_incomplete = abort_cleanup_failure;
 		}
 
 		/* OK, we're outta that level of subtransaction */
 		entry->xact_depth--;
 	}
 }
+
+/*
+ * Raise an error if the given connection cache entry has unfinished abort
+ * cleanup.  Such connections can't safely be further used, because we
+ * were not able to restore the proper transaction context on the remote
+ * side.  Re-establishing the connection would change the snapshot and
+ * roll back any writes already performed, so that's not an option, either.
+ *
+ * Typically, we end up in this state when network connectivity is
+ * interrupted.
+ */
+static void
+pgfdw_reject_if_abort_cleanup_incomplete(ConnCacheEntry *entry)
+{
+	HeapTuple	tup;
+	Form_pg_user_mapping umform;
+	ForeignServer *server;
+
+	if (!entry->abort_cleanup_incomplete)
+		return;
+
+	tup = SearchSysCache1(USERMAPPINGOID,
+						  ObjectIdGetDatum(entry->key));
+	if (!HeapTupleIsValid(tup))
+		elog(ERROR, "cache lookup failed for user mapping %u", entry->key);
+	umform = (Form_pg_user_mapping) GETSTRUCT(tup);
+	server = GetForeignServer(umform->umserver);
+	ReleaseSysCache(tup);
+
+	ereport(ERROR,
+			(errcode(ERRCODE_CONNECTION_EXCEPTION),
+			 errmsg("connection to server \"%s\" was lost",
+					server->servername)));
+}
+
+/*
+ * Cancel the currently-in-progress query (whose query text we do not have)
+ * and ignore the result.  Returns true if we successfully cancel the query
+ * and discard any pending result, and false if not.
+ */
+static bool
+pgfdw_cancel_query(PGconn *conn)
+{
+	PGcancel   *cancel;
+	char		errbuf[256];
+	PGresult   *result = NULL;
+	TimestampTz endtime;
+
+	/*
+	 * If it takes too long to cancel the query and discard the result, assume
+	 * the connection is dead.
+	 */
+	endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000);
+
+	/*
+	 * Issue cancel request.  Unfortunately, there's no good way to limit the
+	 * amount of time that we might block inside PQgetCancel().
+	 */
+	if ((cancel = PQgetCancel(conn)))
+	{
+		if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
+		{
+			ereport(WARNING,
+					(errcode(ERRCODE_CONNECTION_FAILURE),
+					 errmsg("could not send cancel request: %s",
+							errbuf)));
+			PQfreeCancel(cancel);
+			return false;
+		}
+		PQfreeCancel(cancel);
+	}
+
+	/* Get and discard the result of the query. */
+	if (pgfdw_get_cleanup_result(conn, endtime, &result))
+		return false;
+	PQclear(result);
+
+	return true;
+}
+
+/*
+ * Submit a query during (sub)abort cleanup and wait up to 30 seconds for the
+ * result.  If the query is executed without error, the return value is true.
+ * If the query is executed successfully but returns an error, the return
+ * value is true if and only if ignore_errors is set.  If the query can't be
+ * sent or times out, the return value is false.
+ */
+static bool
+pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
+{
+	PGresult   *result = NULL;
+	TimestampTz endtime;
+
+	/*
+	 * If it takes too long to execute a cleanup query, assume the connection
+	 * is dead.  It's fairly likely that this is why we aborted in the first
+	 * place (e.g. statement timeout, user cancel), so the timeout shouldn't
+	 * be too long.
+	 */
+	endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000);
+
+	/*
+	 * Submit a query.  Since we don't use non-blocking mode, this also can
+	 * block.  But its risk is relatively small, so we ignore that for now.
+	 */
+	if (!PQsendQuery(conn, query))
+	{
+		pgfdw_report_error(WARNING, NULL, conn, false, query);
+		return false;
+	}
+
+	/* Get the result of the query. */
+	if (pgfdw_get_cleanup_result(conn, endtime, &result))
+		return false;
+
+	/* Issue a warning if not successful. */
+	if (PQresultStatus(result) != PGRES_COMMAND_OK)
+	{
+		pgfdw_report_error(WARNING, result, conn, true, query);
+		return ignore_errors;
+	}
+
+	return true;
+}
+
+/*
+ * Get, during abort cleanup, the result of a query that is in progress.  This
+ * might be a query that is being interrupted by transaction abort, or it might
+ * be a query that was initiated as part of transaction abort to get the remote
+ * side back to the appropriate state.
+ *
+ * It's not a huge problem if we throw an ERROR here, but if we get into error
+ * recursion trouble, we'll end up slamming the connection shut, which will
+ * necessitate failing the entire toplevel transaction even if subtransactions
+ * were used.  Try to use WARNING where we can.
+ *
+ * endtime is the time at which we should give up and assume the remote
+ * side is dead.  Returns true if the timeout expired, otherwise false.
+ * Sets *result except in case of a timeout.
+ */
+static bool
+pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result)
+{
+	PGresult   *last_res = NULL;
+
+	for (;;)
+	{
+		PGresult   *res;
+
+		while (PQisBusy(conn))
+		{
+			int			wc;
+			TimestampTz now = GetCurrentTimestamp();
+			long		secs;
+			int			microsecs;
+			long		cur_timeout;
+
+			/* If timeout has expired, give up, else get sleep time. */
+			if (now >= endtime)
+				return true;
+			TimestampDifference(now, endtime, &secs, &microsecs);
+
+			/* To protect against clock skew, limit sleep to one minute. */
+			cur_timeout = Min(60000, secs * USECS_PER_SEC + microsecs);
+
+			/* Sleep until there's something to do */
+			wc = WaitLatchOrSocket(MyLatch,
+							  WL_LATCH_SET | WL_SOCKET_READABLE | WL_TIMEOUT,
+								   PQsocket(conn),
+								   cur_timeout, PG_WAIT_EXTENSION);
+			ResetLatch(MyLatch);
+
+			CHECK_FOR_INTERRUPTS();
+
+			/* Data available in socket */
+			if (wc & WL_SOCKET_READABLE)
+			{
+				if (!PQconsumeInput(conn))
+				{
+					*result = NULL;
+					return false;
+				}
+			}
+		}
+
+		res = PQgetResult(conn);
+		if (res == NULL)
+			break;				/* query is complete */
+
+		PQclear(last_res);
+		last_res = res;
+	}
+
+	*result = last_res;
+	return false;
+}
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to