On Thu, Jul 28, 2022 at 11:56 AM Fujii Masao
<masao.fu...@oss.nttdata.com> wrote:
>
>
>
> On 2022/07/27 10:36, Kyotaro Horiguchi wrote:
> > At Tue, 26 Jul 2022 18:33:04 +0900, Fujii Masao 
> > <masao.fu...@oss.nttdata.com> wrote in
> >>> I'm not sure the two are similar with each other.  The new function
> >>> pgfdw_exec_pre_commit() looks like a merger of two isolated code paths
> >>> intended to share a seven-line codelet.  I feel the code gets a bit
> >>> harder to understand after the change.  I mildly oppose to this part.
> >>
> >> If so, we can pgfdw_exec_pre_commit() into two, one is the common
> >> function that sends or executes the command (i.e., calls
> >> do_sql_command_begin() or do_sql_command()), and another is
> >> the function only for toplevel. The latter function calls
> >> the common function and then executes DEALLOCATE ALL things.
> >>
> >> But this is not the way that other functions like
> >> pgfdw_abort_cleanup()
> >> is implemented. Those functions have both codes for toplevel and
> >> !toplevel (i.e., subxact), and run the processings depending
> >> on the argument "toplevel". So I'm thinking that
> >> pgfdw_exec_pre_commit() implemented in the same way is better.
> >
> > I didn't see it from that viewpoint but I don't think that
> > unconditionally justifies other refactoring.  If we merge
> > pgfdw_finish_pre_(sub)?commit_cleanup()s this way, in turn
> > pgfdw_subxact_callback() and pgfdw_xact_callback() are going to be
> > almost identical except event IDs to handle. But I don't think we
> > would want to merge them.
>
> I don't think they are so identical because (as you say) they have to handle 
> different event IDs. So I agree we don't want to merge them.
>
>
> > A concern on 0002 is that it is hiding the subxact-specific steps from
> > the subxact callback.  It would look reasonable if it were called from
> > two or more places for each topleve and !toplevel, but actually it has
> > only one caller for each.  So I think that pgfdw_exec_pre_commit
> > should not do that and should be renamed to pgfdw_commit_remote() or
> > something.  On the other hand pgfdw_finish_pre_commit() hides
> > toplevel-specific steps from the caller so the same argument holds.
>
> So you conclusion is to rename pgfdw_exec_pre_commit() to 
> pgfdw_commit_remote() or something?
>
>
> > Another point that makes me concern about the patch is the new
> > function takes an SQL statement, along with the toplevel flag. I guess
> > the reason is that the command for subxact (RELEASE SAVEPOINT %d)
> > requires the current transaction level.  However, the values
> > isobtainable very cheap within the cleanup functions. So I propose to
> > get rid of the parameter "sql" from the two functions.
>
> Yes, that's possible. That is, pgfdw_exec_pre_commit() can construct the 
> query string by executing the following codes, instead of accepting the query 
> as an argument. But one downside of this approach is that the following codes 
> are executed for every remote subtransaction entries. Maybe it's cheap to 
> construct the query string as follows, but I'd like to avoid any unnecessray 
> overhead if possible. So the patch makes the caller, 
> pgfdw_subxact_callback(), construct the query string only once and give it to 
> pgfdw_exec_pre_commit().
>
>         curlevel = GetCurrentTransactionNestLevel();
>         snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
>

Another possibility I can see is that instead of calling
pgfdw_exec_pre_commit() (similarly pgfdw_abort_cleanup) for every
connection entry, we should call that once from the callback function,
and for that we need to move the hash table loop inside that function.

The structure of the callback function looks a little fuzzy to me
where the same event is checked for every entry of the connection hash
table. Instead of simply move that loop should be inside those
function (e.g. pgfdw_exec_pre_commit and pgfdw_abort_cleanup), and let
called those function called once w.r.t to event and that function
should take care of every entry of the connection hash table. The
benefit is that we would save a few processing cycles that needed to
match events and call the same function for each connection entry.

I tried this refactoring in 0004 patch which is not complete, and
reattaching other patches too to make CFboat happy.

Thoughts? Suggestions?

Regards,
Amul
From d6e241cb946afe3b74c2893bda6dab8d3288716b Mon Sep 17 00:00:00 2001
From: Fujii Masao <fujii@postgresql.org>
Date: Mon, 25 Jul 2022 23:27:14 +0900
Subject: [PATCH v2 3/4] Merge pgfdw_finish_pre_commit_cleanup and
 pgfdw_finish_pre_subcommit_cleanup into one.

---
 contrib/postgres_fdw/connection.c | 78 ++++++++++---------------------
 1 file changed, 25 insertions(+), 53 deletions(-)

diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index ec290459be3..6e23046ad69 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -113,9 +113,8 @@ static bool pgfdw_get_result_timed(PGconn *conn, TimestampTz endtime,
 static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
 static bool pgfdw_exec_pre_commit(ConnCacheEntry *entry, const char *sql,
 								  List **pending_entries, bool toplevel);
-static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
-static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
-											   int curlevel);
+static void pgfdw_finish_pre_commit(List *pending_entries, const char *sql,
+									bool toplevel);
 static bool UserMappingPasswordRequired(UserMapping *user);
 static bool disconnect_cached_connections(Oid serverid);
 
@@ -954,7 +953,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 	{
 		Assert(event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
 			   event == XACT_EVENT_PRE_COMMIT);
-		pgfdw_finish_pre_commit_cleanup(pending_entries);
+		pgfdw_finish_pre_commit(pending_entries, "COMMIT TRANSACTION", true);
 	}
 
 	/*
@@ -1031,7 +1030,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
 	if (pending_entries)
 	{
 		Assert(event == SUBXACT_EVENT_PRE_COMMIT_SUB);
-		pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
+		pgfdw_finish_pre_commit(pending_entries, sql, false);
 	}
 }
 
@@ -1523,11 +1522,14 @@ pgfdw_exec_pre_commit(ConnCacheEntry *entry, const char *sql,
 }
 
 /*
- * Finish pre-commit cleanup of connections on each of which we've sent a
- * COMMIT command to the remote server.
+ * Wait for all remote transactions or subtransactions to be committed
+ * and finish pre-commit.
+ *
+ * "toplevel" should be set to true if toplevel (main) transaction is
+ * committed, false otherwise.
  */
 static void
-pgfdw_finish_pre_commit_cleanup(List *pending_entries)
+pgfdw_finish_pre_commit(List *pending_entries, const char *sql, bool toplevel)
 {
 	ConnCacheEntry *entry;
 	List	   *pending_deallocs = NIL;
@@ -1536,7 +1538,8 @@ pgfdw_finish_pre_commit_cleanup(List *pending_entries)
 	Assert(pending_entries);
 
 	/*
-	 * Get the result of the COMMIT command for each of the pending entries
+	 * Get the result of COMMIT or RELEASE command for each of the pending
+	 * entries.
 	 */
 	foreach(lc, pending_entries)
 	{
@@ -1548,23 +1551,26 @@ pgfdw_finish_pre_commit_cleanup(List *pending_entries)
 		 * We might already have received the result on the socket, so pass
 		 * consume_input=true to try to consume it first
 		 */
-		do_sql_command_end(entry->conn, "COMMIT TRANSACTION", true);
+		do_sql_command_end(entry->conn, sql, true);
 		entry->changing_xact_state = false;
 
 		/* Do a DEALLOCATE ALL in parallel if needed */
-		if (entry->have_prep_stmt && entry->have_error)
+		if (toplevel)
 		{
-			/* Ignore errors (see notes in pgfdw_xact_callback) */
-			if (PQsendQuery(entry->conn, "DEALLOCATE ALL"))
+			if (entry->have_prep_stmt && entry->have_error)
 			{
-				pending_deallocs = lappend(pending_deallocs, entry);
-				continue;
+				/* Ignore errors (see notes in pgfdw_xact_callback) */
+				if (PQsendQuery(entry->conn, "DEALLOCATE ALL"))
+				{
+					pending_deallocs = lappend(pending_deallocs, entry);
+					continue;
+				}
 			}
+			entry->have_prep_stmt = false;
+			entry->have_error = false;
 		}
-		entry->have_prep_stmt = false;
-		entry->have_error = false;
 
-		pgfdw_reset_xact_state(entry, true);
+		pgfdw_reset_xact_state(entry, toplevel);
 	}
 
 	/* No further work if no pending entries */
@@ -1588,41 +1594,7 @@ pgfdw_finish_pre_commit_cleanup(List *pending_entries)
 		entry->have_prep_stmt = false;
 		entry->have_error = false;
 
-		pgfdw_reset_xact_state(entry, true);
-	}
-}
-
-/*
- * Finish pre-subcommit cleanup of connections on each of which we've sent a
- * RELEASE command to the remote server.
- */
-static void
-pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
-{
-	ConnCacheEntry *entry;
-	char		sql[100];
-	ListCell   *lc;
-
-	Assert(pending_entries);
-
-	/*
-	 * Get the result of the RELEASE command for each of the pending entries
-	 */
-	snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
-	foreach(lc, pending_entries)
-	{
-		entry = (ConnCacheEntry *) lfirst(lc);
-
-		Assert(entry->changing_xact_state);
-
-		/*
-		 * We might already have received the result on the socket, so pass
-		 * consume_input=true to try to consume it first
-		 */
-		do_sql_command_end(entry->conn, sql, true);
-		entry->changing_xact_state = false;
-
-		pgfdw_reset_xact_state(entry, false);
+		pgfdw_reset_xact_state(entry, toplevel);
 	}
 }
 
-- 
2.18.0

From 087d1aa83850577ee75792e78f310fe45e3240dd Mon Sep 17 00:00:00 2001
From: Fujii Masao <fujii@postgresql.org>
Date: Mon, 25 Jul 2022 22:45:06 +0900
Subject: [PATCH v2 2/4] Add common function to commit xact or subxact during
 pre-commit.

---
 contrib/postgres_fdw/connection.c | 122 ++++++++++++++++--------------
 1 file changed, 66 insertions(+), 56 deletions(-)

diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index cbee2854803..ec290459be3 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -111,6 +111,8 @@ static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
 static bool pgfdw_get_result_timed(PGconn *conn, TimestampTz endtime,
 								   PGresult **result, bool *timed_out);
 static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
+static bool pgfdw_exec_pre_commit(ConnCacheEntry *entry, const char *sql,
+								  List **pending_entries, bool toplevel);
 static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
 static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
 											   int curlevel);
@@ -894,8 +896,6 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 	hash_seq_init(&scan, ConnectionHash);
 	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
 	{
-		PGresult   *res;
-
 		/* Ignore cache entry if no open connection right now */
 		if (entry->conn == NULL)
 			continue;
@@ -911,45 +911,10 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 				case XACT_EVENT_PARALLEL_PRE_COMMIT:
 				case XACT_EVENT_PRE_COMMIT:
 
-					/*
-					 * If abort cleanup previously failed for this connection,
-					 * we can't issue any more commands against it.
-					 */
-					pgfdw_reject_incomplete_xact_state_change(entry);
-
 					/* Commit all remote transactions during pre-commit */
-					entry->changing_xact_state = true;
-					if (entry->parallel_commit)
-					{
-						do_sql_command_begin(entry->conn, "COMMIT TRANSACTION");
-						pending_entries = lappend(pending_entries, entry);
+					if (pgfdw_exec_pre_commit(entry, "COMMIT TRANSACTION",
+											  &pending_entries, true))
 						continue;
-					}
-					do_sql_command(entry->conn, "COMMIT TRANSACTION");
-					entry->changing_xact_state = false;
-
-					/*
-					 * If there were any errors in subtransactions, and we
-					 * made prepared statements, do a DEALLOCATE ALL to make
-					 * sure we get rid of all prepared statements. This is
-					 * annoying and not terribly bulletproof, but it's
-					 * probably not worth trying harder.
-					 *
-					 * DEALLOCATE ALL only exists in 8.3 and later, so this
-					 * constrains how old a server postgres_fdw can
-					 * communicate with.  We intentionally ignore errors in
-					 * the DEALLOCATE, so that we can hobble along to some
-					 * extent with older servers (leaking prepared statements
-					 * as we go; but we don't really support update operations
-					 * pre-8.3 anyway).
-					 */
-					if (entry->have_prep_stmt && entry->have_error)
-					{
-						res = PQexec(entry->conn, "DEALLOCATE ALL");
-						PQclear(res);
-					}
-					entry->have_prep_stmt = false;
-					entry->have_error = false;
 					break;
 				case XACT_EVENT_PRE_PREPARE:
 
@@ -1014,6 +979,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
 	ConnCacheEntry *entry;
 	int			curlevel;
 	List	   *pending_entries = NIL;
+	char		sql[100];
 
 	/* Nothing to do at subxact start, nor after commit. */
 	if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
@@ -1029,11 +995,11 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
 	 * of the current level, and close them.
 	 */
 	curlevel = GetCurrentTransactionNestLevel();
+	snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
+
 	hash_seq_init(&scan, ConnectionHash);
 	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
 	{
-		char		sql[100];
-
 		/*
 		 * We only care about connections with open remote subtransactions of
 		 * the current level.
@@ -1047,23 +1013,9 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
 
 		if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
 		{
-			/*
-			 * If abort cleanup previously failed for this connection, we
-			 * can't issue any more commands against it.
-			 */
-			pgfdw_reject_incomplete_xact_state_change(entry);
-
 			/* Commit all remote subtransactions during pre-commit */
-			snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
-			entry->changing_xact_state = true;
-			if (entry->parallel_commit)
-			{
-				do_sql_command_begin(entry->conn, sql);
-				pending_entries = lappend(pending_entries, entry);
+			if (pgfdw_exec_pre_commit(entry, sql, &pending_entries, false))
 				continue;
-			}
-			do_sql_command(entry->conn, sql);
-			entry->changing_xact_state = false;
 		}
 		else
 		{
@@ -1512,6 +1464,64 @@ pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
 	entry->changing_xact_state = false;
 }
 
+/*
+ * Commit all remote transactions or subtransactions during pre-commit.
+ *
+ * If parallel_commit is enabled at this connection cache entry and
+ * the result of "sql" needs to be gotten later, return true and append
+ * this entry to "pending_entries".
+ *
+ * "toplevel" should be set to true if toplevel (main) transaction is
+ * committed, false otherwise.
+ */
+static bool
+pgfdw_exec_pre_commit(ConnCacheEntry *entry, const char *sql,
+					  List **pending_entries, bool toplevel)
+{
+	PGresult   *res;
+
+	/*
+	 * If abort cleanup previously failed for this connection, we can't issue
+	 * any more commands against it.
+	 */
+	pgfdw_reject_incomplete_xact_state_change(entry);
+
+	entry->changing_xact_state = true;
+	if (entry->parallel_commit)
+	{
+		do_sql_command_begin(entry->conn, sql);
+		*pending_entries = lappend(*pending_entries, entry);
+		return true;
+	}
+	do_sql_command(entry->conn, sql);
+	entry->changing_xact_state = false;
+
+	if (!toplevel)
+		return false;
+
+	/*
+	 * If there were any errors in subtransactions, and we made prepared
+	 * statements, do a DEALLOCATE ALL to make sure we get rid of all prepared
+	 * statements. This is annoying and not terribly bulletproof, but it's
+	 * probably not worth trying harder.
+	 *
+	 * DEALLOCATE ALL only exists in 8.3 and later, so this constrains how old
+	 * a server postgres_fdw can communicate with.  We intentionally ignore
+	 * errors in the DEALLOCATE, so that we can hobble along to some extent
+	 * with older servers (leaking prepared statements as we go; but we don't
+	 * really support update operations pre-8.3 anyway).
+	 */
+	if (entry->have_prep_stmt && entry->have_error)
+	{
+		res = PQexec(entry->conn, "DEALLOCATE ALL");
+		PQclear(res);
+	}
+	entry->have_prep_stmt = false;
+	entry->have_error = false;
+
+	return false;
+}
+
 /*
  * Finish pre-commit cleanup of connections on each of which we've sent a
  * COMMIT command to the remote server.
-- 
2.18.0

From f8aaf91d0fdb88c6726c45f924d8dea19bf98cff Mon Sep 17 00:00:00 2001
From: Amul Sul <amul.sul@enterprisedb.com>
Date: Wed, 3 Aug 2022 10:17:09 -0400
Subject: [PATCH v2 4/4] TRIAL: cleanup xact callback - WIP

---
 contrib/postgres_fdw/connection.c | 420 +++++++++++++++---------------
 1 file changed, 213 insertions(+), 207 deletions(-)

diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 6e23046ad69..181d2f83e32 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -110,9 +110,8 @@ static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
 									 bool ignore_errors);
 static bool pgfdw_get_result_timed(PGconn *conn, TimestampTz endtime,
 								   PGresult **result, bool *timed_out);
-static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
-static bool pgfdw_exec_pre_commit(ConnCacheEntry *entry, const char *sql,
-								  List **pending_entries, bool toplevel);
+static void pgfdw_abort_cleanup(bool toplevel);
+static void pgfdw_exec_pre_commit(bool toplevel);
 static void pgfdw_finish_pre_commit(List *pending_entries, const char *sql,
 									bool toplevel);
 static bool UserMappingPasswordRequired(UserMapping *user);
@@ -880,80 +879,47 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
 static void
 pgfdw_xact_callback(XactEvent event, void *arg)
 {
-	HASH_SEQ_STATUS scan;
-	ConnCacheEntry *entry;
-	List	   *pending_entries = NIL;
-
 	/* Quick exit if no connections were touched in this transaction. */
 	if (!xact_got_connection)
 		return;
 
-	/*
-	 * Scan all connection cache entries to find open remote transactions, and
-	 * close them.
-	 */
-	hash_seq_init(&scan, ConnectionHash);
-	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+	switch (event)
 	{
-		/* Ignore cache entry if no open connection right now */
-		if (entry->conn == NULL)
-			continue;
-
-		/* If it has an open remote transaction, try to close it */
-		if (entry->xact_depth > 0)
-		{
-			elog(DEBUG3, "closing remote transaction on connection %p",
-				 entry->conn);
+		case XACT_EVENT_PARALLEL_PRE_COMMIT:
+		case XACT_EVENT_PRE_COMMIT:
 
-			switch (event)
-			{
-				case XACT_EVENT_PARALLEL_PRE_COMMIT:
-				case XACT_EVENT_PRE_COMMIT:
-
-					/* Commit all remote transactions during pre-commit */
-					if (pgfdw_exec_pre_commit(entry, "COMMIT TRANSACTION",
-											  &pending_entries, true))
-						continue;
-					break;
-				case XACT_EVENT_PRE_PREPARE:
-
-					/*
-					 * We disallow any remote transactions, since it's not
-					 * very reasonable to hold them open until the prepared
-					 * transaction is committed.  For the moment, throw error
-					 * unconditionally; later we might allow read-only cases.
-					 * Note that the error will cause us to come right back
-					 * here with event == XACT_EVENT_ABORT, so we'll clean up
-					 * the connection state at that point.
-					 */
-					ereport(ERROR,
-							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-							 errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
-					break;
-				case XACT_EVENT_PARALLEL_COMMIT:
-				case XACT_EVENT_COMMIT:
-				case XACT_EVENT_PREPARE:
-					/* Pre-commit should have closed the open transaction */
-					elog(ERROR, "missed cleaning up connection during pre-commit");
-					break;
-				case XACT_EVENT_PARALLEL_ABORT:
-				case XACT_EVENT_ABORT:
-					/* Rollback all remote transactions during abort */
-					pgfdw_abort_cleanup(entry, true);
-					break;
-			}
-		}
+			/* Commit all remote transactions during pre-commit */
+			pgfdw_exec_pre_commit(true);
+			break;
 
-		/* Reset state to show we're out of a transaction */
-		pgfdw_reset_xact_state(entry, true);
-	}
+		case XACT_EVENT_PRE_PREPARE:
 
-	/* If there are any pending connections, finish cleaning them up */
-	if (pending_entries)
-	{
-		Assert(event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
-			   event == XACT_EVENT_PRE_COMMIT);
-		pgfdw_finish_pre_commit(pending_entries, "COMMIT TRANSACTION", true);
+			/*
+			 * We disallow any remote transactions, since it's not
+			 * very reasonable to hold them open until the prepared
+			 * transaction is committed.  For the moment, throw error
+			 * unconditionally; later we might allow read-only cases.
+			 * Note that the error will cause us to come right back
+			 * here with event == XACT_EVENT_ABORT, so we'll clean up
+			 * the connection state at that point.
+			 */
+			ereport(ERROR,
+					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					 errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
+			break;
+
+		case XACT_EVENT_PARALLEL_COMMIT:
+		case XACT_EVENT_COMMIT:
+		case XACT_EVENT_PREPARE:
+			/* Pre-commit should have closed the open transaction */
+			elog(ERROR, "missed cleaning up connection during pre-commit");
+			break;
+
+		case XACT_EVENT_PARALLEL_ABORT:
+		case XACT_EVENT_ABORT:
+			/* Rollback all remote transactions during abort */
+			pgfdw_abort_cleanup(true);
+			break;
 	}
 
 	/*
@@ -974,12 +940,6 @@ static void
 pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
 					   SubTransactionId parentSubid, void *arg)
 {
-	HASH_SEQ_STATUS scan;
-	ConnCacheEntry *entry;
-	int			curlevel;
-	List	   *pending_entries = NIL;
-	char		sql[100];
-
 	/* Nothing to do at subxact start, nor after commit. */
 	if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
 		  event == SUBXACT_EVENT_ABORT_SUB))
@@ -989,48 +949,15 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
 	if (!xact_got_connection)
 		return;
 
-	/*
-	 * Scan all connection cache entries to find open remote subtransactions
-	 * of the current level, and close them.
-	 */
-	curlevel = GetCurrentTransactionNestLevel();
-	snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
-
-	hash_seq_init(&scan, ConnectionHash);
-	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+	if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
 	{
-		/*
-		 * We only care about connections with open remote subtransactions of
-		 * the current level.
-		 */
-		if (entry->conn == NULL || entry->xact_depth < curlevel)
-			continue;
-
-		if (entry->xact_depth > curlevel)
-			elog(ERROR, "missed cleaning up remote subtransaction at level %d",
-				 entry->xact_depth);
-
-		if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
-		{
-			/* Commit all remote subtransactions during pre-commit */
-			if (pgfdw_exec_pre_commit(entry, sql, &pending_entries, false))
-				continue;
-		}
-		else
-		{
-			/* Rollback all remote subtransactions during abort */
-			pgfdw_abort_cleanup(entry, false);
-		}
-
-		/* OK, we're outta that level of subtransaction */
-		pgfdw_reset_xact_state(entry, false);
+		/* Commit all remote subtransactions during pre-commit */
+		pgfdw_exec_pre_commit(false);
 	}
-
-	/* If there are any pending connections, finish cleaning them up */
-	if (pending_entries)
+	else
 	{
-		Assert(event == SUBXACT_EVENT_PRE_COMMIT_SUB);
-		pgfdw_finish_pre_commit(pending_entries, sql, false);
+		/* Rollback all remote subtransactions during abort */
+		pgfdw_abort_cleanup(false);
 	}
 }
 
@@ -1394,131 +1321,210 @@ exit:	;
  * Set entry->changing_xact_state to false on success, true on failure.
  */
 static void
-pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
+pgfdw_abort_cleanup(bool toplevel)
 {
+	HASH_SEQ_STATUS scan;
+	ConnCacheEntry *entry;
 	char		sql[100];
 
 	/*
-	 * Don't try to clean up the connection if we're already in error
-	 * recursion trouble.
+	 * Scan all connection cache entries to find open remote transactions, and
+	 * close them.
 	 */
-	if (in_error_recursion_trouble())
-		entry->changing_xact_state = true;
+	hash_seq_init(&scan, ConnectionHash);
+	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+	{
+		/* Ignore cache entry if no open connection right now */
+		if (entry->conn == NULL)
+			continue;
 
-	/*
-	 * If connection is already unsalvageable, don't touch it further.
-	 */
-	if (entry->changing_xact_state)
-		return;
+		/* Sanity check for subtransaction */
+		if (!toplevel)
+		{
+			int		curlevel = GetCurrentTransactionNestLevel();
 
-	/*
-	 * Mark this connection as in the process of changing transaction state.
-	 */
-	entry->changing_xact_state = true;
+			if (entry->xact_depth < curlevel)
+				continue;
 
-	/* Assume we might have lost track of prepared statements */
-	entry->have_error = true;
+			if (entry->xact_depth > curlevel)
+				elog(ERROR, "missed cleaning up remote subtransaction at level %d",
+					 entry->xact_depth);
+		}
 
-	/*
-	 * If a command has been submitted to the remote server by using an
-	 * asynchronous execution function, the command might not have yet
-	 * completed.  Check to see if a command is still being processed by the
-	 * remote server, and if so, request cancellation of the command.
-	 */
-	if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
-		!pgfdw_cancel_query(entry->conn))
-		return;					/* Unable to cancel running query */
+		/* If it has an open remote transaction, try to close it */
+		if (entry->xact_depth > 0)
+		{
+			/*
+			 * Don't try to clean up the connection if we're already in error
+			 * recursion trouble.
+			 */
+			if (in_error_recursion_trouble())
+				entry->changing_xact_state = true;
 
-	if (toplevel)
-		snprintf(sql, sizeof(sql), "ABORT TRANSACTION");
-	else
-		snprintf(sql, sizeof(sql),
-				 "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
-				 entry->xact_depth, entry->xact_depth);
-	if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
-		return;					/* Unable to abort remote (sub)transaction */
+			/*
+			 * If connection is already unsalvageable, don't touch it further.
+			 */
+			if (entry->changing_xact_state)
+				goto xact_abort_end;
 
-	if (toplevel)
-	{
-		if (entry->have_prep_stmt && entry->have_error &&
-			!pgfdw_exec_cleanup_query(entry->conn,
-									  "DEALLOCATE ALL",
-									  true))
-			return;				/* Trouble clearing prepared statements */
+			/*
+			 * Mark this connection as in the process of changing transaction state.
+			 */
+			entry->changing_xact_state = true;
 
-		entry->have_prep_stmt = false;
-		entry->have_error = false;
-	}
+			/* Assume we might have lost track of prepared statements */
+			entry->have_error = true;
 
-	/*
-	 * If pendingAreq of the per-connection state is not NULL, it means that
-	 * an asynchronous fetch begun by fetch_more_data_begin() was not done
-	 * successfully and thus the per-connection state was not reset in
-	 * fetch_more_data(); in that case reset the per-connection state here.
-	 */
-	if (entry->state.pendingAreq)
-		memset(&entry->state, 0, sizeof(entry->state));
+			/*
+			 * If a command has been submitted to the remote server by using an
+			 * asynchronous execution function, the command might not have yet
+			 * completed.  Check to see if a command is still being processed by the
+			 * remote server, and if so, request cancellation of the command.
+			 */
+			if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
+				!pgfdw_cancel_query(entry->conn))
+				goto xact_abort_end;					/* Unable to cancel running query */
 
-	/* Disarm changing_xact_state if it all worked */
-	entry->changing_xact_state = false;
+			if (toplevel)
+				snprintf(sql, sizeof(sql), "ABORT TRANSACTION");
+			else
+				snprintf(sql, sizeof(sql),
+						 "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
+						 entry->xact_depth, entry->xact_depth);
+			if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
+				goto xact_abort_end;		/* Unable to abort remote (sub)transaction */
+
+			if (toplevel)
+			{
+				if (entry->have_prep_stmt && entry->have_error &&
+					!pgfdw_exec_cleanup_query(entry->conn,
+											  "DEALLOCATE ALL",
+											  true))
+					goto xact_abort_end;	/* Trouble clearing prepared statements */
+
+				entry->have_prep_stmt = false;
+				entry->have_error = false;
+			}
+
+			/*
+			 * If pendingAreq of the per-connection state is not NULL, it means that
+			 * an asynchronous fetch begun by fetch_more_data_begin() was not done
+			 * successfully and thus the per-connection state was not reset in
+			 * fetch_more_data(); in that case reset the per-connection state here.
+			 */
+			if (entry->state.pendingAreq)
+				memset(&entry->state, 0, sizeof(entry->state));
+
+			/* Disarm changing_xact_state if it all worked */
+			entry->changing_xact_state = false;
+		}
+
+xact_abort_end:
+		/* Reset state to show we're out of a transaction */
+		pgfdw_reset_xact_state(entry, toplevel);
+	}
 }
 
 /*
  * Commit all remote transactions or subtransactions during pre-commit.
  *
- * If parallel_commit is enabled at this connection cache entry and
- * the result of "sql" needs to be gotten later, return true and append
- * this entry to "pending_entries".
- *
  * "toplevel" should be set to true if toplevel (main) transaction is
  * committed, false otherwise.
  */
-static bool
-pgfdw_exec_pre_commit(ConnCacheEntry *entry, const char *sql,
-					  List **pending_entries, bool toplevel)
+static void
+pgfdw_exec_pre_commit(bool toplevel)
 {
-	PGresult   *res;
-
-	/*
-	 * If abort cleanup previously failed for this connection, we can't issue
-	 * any more commands against it.
-	 */
-	pgfdw_reject_incomplete_xact_state_change(entry);
+	HASH_SEQ_STATUS scan;
+	ConnCacheEntry *entry;
+	List	   *pending_entries = NIL;
+	int			curlevel = 0;
+	char	  	sql[100];
 
-	entry->changing_xact_state = true;
-	if (entry->parallel_commit)
+	/* Form SQL query */
+	if (toplevel)
 	{
-		do_sql_command_begin(entry->conn, sql);
-		*pending_entries = lappend(*pending_entries, entry);
-		return true;
+		snprintf(sql, sizeof(sql), "COMMIT TRANSACTION");
+	}
+	else
+	{
+		curlevel = GetCurrentTransactionNestLevel();
+		snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
 	}
-	do_sql_command(entry->conn, sql);
-	entry->changing_xact_state = false;
-
-	if (!toplevel)
-		return false;
 
 	/*
-	 * If there were any errors in subtransactions, and we made prepared
-	 * statements, do a DEALLOCATE ALL to make sure we get rid of all prepared
-	 * statements. This is annoying and not terribly bulletproof, but it's
-	 * probably not worth trying harder.
-	 *
-	 * DEALLOCATE ALL only exists in 8.3 and later, so this constrains how old
-	 * a server postgres_fdw can communicate with.  We intentionally ignore
-	 * errors in the DEALLOCATE, so that we can hobble along to some extent
-	 * with older servers (leaking prepared statements as we go; but we don't
-	 * really support update operations pre-8.3 anyway).
+	 * Scan all connection cache entries to find open remote transactions, and
+	 * close them.
 	 */
-	if (entry->have_prep_stmt && entry->have_error)
+	hash_seq_init(&scan, ConnectionHash);
+	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
 	{
-		res = PQexec(entry->conn, "DEALLOCATE ALL");
-		PQclear(res);
+		/* Ignore cache entry if no open connection right now */
+		if (entry->conn == NULL)
+			continue;
+
+		/* Sanity check for subtransaction */
+		if (!toplevel)
+		{
+			if (entry->xact_depth < curlevel)
+				continue;
+
+			if (entry->xact_depth > curlevel)
+				elog(ERROR, "missed cleaning up remote subtransaction at level %d",
+					 entry->xact_depth);
+		}
+
+		/* If it has an open remote transaction, try to close it */
+		if (entry->xact_depth > 0)
+		{
+			/*
+			 * If abort cleanup previously failed for this connection, we can't issue
+			 * any more commands against it.
+			 */
+			pgfdw_reject_incomplete_xact_state_change(entry);
+
+			entry->changing_xact_state = true;
+			if (entry->parallel_commit)
+			{
+				do_sql_command_begin(entry->conn, sql);
+				pending_entries = lappend(pending_entries, entry);
+				continue;
+			}
+			do_sql_command(entry->conn, sql);
+			entry->changing_xact_state = false;
+
+			if (toplevel)
+			{
+				/*
+				 * If there were any errors in subtransactions, and we made prepared
+				 * statements, do a DEALLOCATE ALL to make sure we get rid of all prepared
+				 * statements. This is annoying and not terribly bulletproof, but it's
+				 * probably not worth trying harder.
+				 *
+				 * DEALLOCATE ALL only exists in 8.3 and later, so this constrains how old
+				 * a server postgres_fdw can communicate with.  We intentionally ignore
+				 * errors in the DEALLOCATE, so that we can hobble along to some extent
+				 * with older servers (leaking prepared statements as we go; but we don't
+				 * really support update operations pre-8.3 anyway).
+				 */
+				if (entry->have_prep_stmt && entry->have_error)
+				{
+					PGresult   *res;
+
+					res = PQexec(entry->conn, "DEALLOCATE ALL");
+					PQclear(res);
+				}
+				entry->have_prep_stmt = false;
+				entry->have_error = false;
+			}
+		}
+
+		/* Reset state to show we're out of a transaction */
+		pgfdw_reset_xact_state(entry, toplevel);
 	}
-	entry->have_prep_stmt = false;
-	entry->have_error = false;
 
-	return false;
+	/* If there are any pending connections, finish cleaning them up */
+	if (pending_entries)
+		pgfdw_finish_pre_commit(pending_entries, sql, toplevel);
 }
 
 /*
-- 
2.18.0

From 79068f8f60c04612bd4a086e001a6b0aa11daa12 Mon Sep 17 00:00:00 2001
From: Fujii Masao <fujii@postgresql.org>
Date: Mon, 25 Jul 2022 17:25:24 +0900
Subject: [PATCH v2 1/4] Refactor pgfdw_get_result() and
 pgfdw_get_cleanup_result().

---
 contrib/postgres_fdw/connection.c | 125 +++++++++++-------------------
 1 file changed, 47 insertions(+), 78 deletions(-)

diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 939d114f02e..cbee2854803 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -108,8 +108,8 @@ static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
 static bool pgfdw_cancel_query(PGconn *conn);
 static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
 									 bool ignore_errors);
-static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
-									 PGresult **result, bool *timed_out);
+static bool pgfdw_get_result_timed(PGconn *conn, TimestampTz endtime,
+								   PGresult **result, bool *timed_out);
 static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
 static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
 static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
@@ -799,53 +799,12 @@ pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
 PGresult *
 pgfdw_get_result(PGconn *conn, const char *query)
 {
-	PGresult   *volatile last_res = NULL;
-
-	/* In what follows, do not leak any PGresults on an error. */
-	PG_TRY();
-	{
-		for (;;)
-		{
-			PGresult   *res;
-
-			while (PQisBusy(conn))
-			{
-				int			wc;
-
-				/* Sleep until there's something to do */
-				wc = WaitLatchOrSocket(MyLatch,
-									   WL_LATCH_SET | WL_SOCKET_READABLE |
-									   WL_EXIT_ON_PM_DEATH,
-									   PQsocket(conn),
-									   -1L, PG_WAIT_EXTENSION);
-				ResetLatch(MyLatch);
-
-				CHECK_FOR_INTERRUPTS();
-
-				/* Data available in socket? */
-				if (wc & WL_SOCKET_READABLE)
-				{
-					if (!PQconsumeInput(conn))
-						pgfdw_report_error(ERROR, NULL, conn, false, query);
-				}
-			}
-
-			res = PQgetResult(conn);
-			if (res == NULL)
-				break;			/* query is complete */
+	PGresult   *result = NULL;
 
-			PQclear(last_res);
-			last_res = res;
-		}
-	}
-	PG_CATCH();
-	{
-		PQclear(last_res);
-		PG_RE_THROW();
-	}
-	PG_END_TRY();
+	if (pgfdw_get_result_timed(conn, 0, &result, NULL))
+		pgfdw_report_error(ERROR, NULL, conn, false, query);
 
-	return last_res;
+	return result;
 }
 
 /*
@@ -1295,7 +1254,7 @@ pgfdw_cancel_query(PGconn *conn)
 	}
 
 	/* Get and discard the result of the query. */
-	if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
+	if (pgfdw_get_result_timed(conn, endtime, &result, &timed_out))
 	{
 		if (timed_out)
 			ereport(WARNING,
@@ -1351,7 +1310,7 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
 	}
 
 	/* Get the result of the query. */
-	if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
+	if (pgfdw_get_result_timed(conn, endtime, &result, &timed_out))
 	{
 		if (timed_out)
 			ereport(WARNING,
@@ -1375,24 +1334,33 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
 }
 
 /*
- * Get, during abort cleanup, the result of a query that is in progress.  This
- * might be a query that is being interrupted by transaction abort, or it might
- * be a query that was initiated as part of transaction abort to get the remote
- * side back to the appropriate state.
+ * Get the result of a query.
+ *
+ * This function offers quick responsiveness by checking for any interruptions.
+ *
+ * If timed_out is NULL, the timeout does not occur. Otherwise, the timeout is
+ * enabled and endtime is used as the time at which this function should
+ * give up and assume the remote side is dead.
+ *
+ * Return true if the timeout expired or connection trouble occurred. Otherwise
+ * return false and set *result to the last result of a query. Set timed_out to
+ * true only when the timeout expired.
+ *
+ * This function emulates PQexec()'s behavior of returning the last result
+ * when there are many.
+ *
+ * Caller is responsible for the error handling on the result.
  *
- * endtime is the time at which we should give up and assume the remote
- * side is dead.  Returns true if the timeout expired or connection trouble
- * occurred, false otherwise.  Sets *result except in case of a timeout.
- * Sets timed_out to true only when the timeout expired.
  */
 static bool
-pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result,
-						 bool *timed_out)
+pgfdw_get_result_timed(PGconn *conn, TimestampTz endtime, PGresult **result,
+					   bool *timed_out)
 {
 	volatile bool failed = false;
 	PGresult   *volatile last_res = NULL;
 
-	*timed_out = false;
+	if (timed_out != NULL)
+		*timed_out = false;
 
 	/* In what follows, do not leak any PGresults on an error. */
 	PG_TRY();
@@ -1404,23 +1372,27 @@ pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result,
 			while (PQisBusy(conn))
 			{
 				int			wc;
-				TimestampTz now = GetCurrentTimestamp();
-				long		cur_timeout;
+				long		cur_timeout = -1;
+				int			wakeEvents = WL_LATCH_SET | WL_SOCKET_READABLE |
+				WL_EXIT_ON_PM_DEATH;
 
 				/* If timeout has expired, give up, else get sleep time. */
-				cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
-				if (cur_timeout <= 0)
+				if (timed_out != NULL)
 				{
-					*timed_out = true;
-					failed = true;
-					goto exit;
+					TimestampTz now = GetCurrentTimestamp();
+
+					cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
+					if (cur_timeout <= 0)
+					{
+						*timed_out = true;
+						failed = true;
+						goto exit;
+					}
+					wakeEvents |= WL_TIMEOUT;
 				}
 
 				/* Sleep until there's something to do */
-				wc = WaitLatchOrSocket(MyLatch,
-									   WL_LATCH_SET | WL_SOCKET_READABLE |
-									   WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
-									   PQsocket(conn),
+				wc = WaitLatchOrSocket(MyLatch, wakeEvents, PQsocket(conn),
 									   cur_timeout, PG_WAIT_EXTENSION);
 				ResetLatch(MyLatch);
 
@@ -1458,6 +1430,7 @@ exit:	;
 		PQclear(last_res);
 	else
 		*result = last_res;
+
 	return failed;
 }
 
@@ -1599,13 +1572,9 @@ pgfdw_finish_pre_commit_cleanup(List *pending_entries)
 		entry = (ConnCacheEntry *) lfirst(lc);
 
 		/* Ignore errors (see notes in pgfdw_xact_callback) */
-		while ((res = PQgetResult(entry->conn)) != NULL)
-		{
-			PQclear(res);
-			/* Stop if the connection is lost (else we'll loop infinitely) */
-			if (PQstatus(entry->conn) == CONNECTION_BAD)
-				break;
-		}
+		pgfdw_get_result_timed(entry->conn, 0, &res, NULL);
+		PQclear(res);
+
 		entry->have_prep_stmt = false;
 		entry->have_error = false;
 
-- 
2.18.0

Reply via email to