From 9e8b4ddd829f73a5520a8796c7e74f2589a8d969 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Thu, 6 May 2021 18:57:38 +1000
Subject: [PATCH v3] Fix wrconn. Use stack variable.

This patch replaces the global "wrconn" in AlterSubscription_refresh with a local variable of the same name, making it consistent with other functions in subscriptioncmds.c (e.g. DropSubscription).

The global wrconn is only meant to be used for logical apply/tablesync worker. To reduce future confusion it has renamed from "wrconn" to "lrep_worker_wrconn".

Using the global/incorrect wrconn in AlterSubscription_refresh doesn't normally cause any problems, but harm is still posslble if the apply worker ever manages to do a subscription refresh. e.g. see [1].

[1] https://www.postgresql.org/message-id/20201111215820.qihhrz7fayu6myfi%40alap3.anarazel.de
---
 src/backend/commands/subscriptioncmds.c     | 16 ++++++++--------
 src/backend/replication/logical/launcher.c  |  4 ++--
 src/backend/replication/logical/tablesync.c | 29 +++++++++++++++--------------
 src/backend/replication/logical/worker.c    | 20 ++++++++++----------
 src/include/replication/worker_internal.h   |  2 +-
 5 files changed, 36 insertions(+), 35 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 517c8ed..1096aa8 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -556,18 +556,19 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 		char		state;
 	} SubRemoveRels;
 	SubRemoveRels *sub_remove_rels;
+	WalReceiverConn *wrconn;
 
 	/* Load the library providing us libpq calls. */
 	load_file("libpqwalreceiver", false);
 
+	/* Try to connect to the publisher. */
+	wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
+	if (!wrconn)
+		ereport(ERROR,
+				(errmsg("could not connect to the publisher: %s", err)));
+
 	PG_TRY();
 	{
-		/* Try to connect to the publisher. */
-		wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
-		if (!wrconn)
-			ereport(ERROR,
-					(errmsg("could not connect to the publisher: %s", err)));
-
 		/* Get the table list from publisher. */
 		pubrel_names = fetch_table_list(wrconn, sub->publications);
 
@@ -737,8 +738,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 	}
 	PG_FINALLY();
 	{
-		if (wrconn)
-			walrcv_disconnect(wrconn);
+		walrcv_disconnect(wrconn);
 	}
 	PG_END_TRY();
 
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index cb462a0..a39ae17 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -649,8 +649,8 @@ static void
 logicalrep_worker_onexit(int code, Datum arg)
 {
 	/* Disconnect gracefully from the remote side. */
-	if (wrconn)
-		walrcv_disconnect(wrconn);
+	if (lrep_worker_wrconn)
+		walrcv_disconnect(lrep_worker_wrconn);
 
 	logicalrep_worker_detach();
 
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 0638f5c..eda9f23 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -302,8 +302,8 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 								   MyLogicalRepWorker->relstate,
 								   MyLogicalRepWorker->relstate_lsn);
 
-		/* End wal streaming so wrconn can be re-used to drop the slot. */
-		walrcv_endstreaming(wrconn, &tli);
+		/* End wal streaming so lrep_worker_wrconn can be re-used to drop the slot. */
+		walrcv_endstreaming(lrep_worker_wrconn, &tli);
 
 		/*
 		 * Cleanup the tablesync slot.
@@ -322,7 +322,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 		 * otherwise, it won't be dropped till the corresponding subscription
 		 * is dropped. So passing missing_ok = false.
 		 */
-		ReplicationSlotDropAtPubNode(wrconn, syncslotname, false);
+		ReplicationSlotDropAtPubNode(lrep_worker_wrconn, syncslotname, false);
 
 		finish_sync_worker();
 	}
@@ -642,7 +642,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
 		for (;;)
 		{
 			/* Try read the data. */
-			len = walrcv_receive(wrconn, &buf, &fd);
+			len = walrcv_receive(lrep_worker_wrconn, &buf, &fd);
 
 			CHECK_FOR_INTERRUPTS();
 
@@ -715,7 +715,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 					 "   AND c.relname = %s",
 					 quote_literal_cstr(nspname),
 					 quote_literal_cstr(relname));
-	res = walrcv_exec(wrconn, cmd.data, lengthof(tableRow), tableRow);
+	res = walrcv_exec(lrep_worker_wrconn, cmd.data, lengthof(tableRow), tableRow);
 
 	if (res->status != WALRCV_OK_TUPLES)
 		ereport(ERROR,
@@ -752,9 +752,10 @@ fetch_remote_table_info(char *nspname, char *relname,
 					 "   AND a.attrelid = %u"
 					 " ORDER BY a.attnum",
 					 lrel->remoteid,
-					 (walrcv_server_version(wrconn) >= 120000 ? "AND a.attgenerated = ''" : ""),
+					 (walrcv_server_version(lrep_worker_wrconn) >= 120000 ?
+						"AND a.attgenerated = ''" : ""),
 					 lrel->remoteid);
-	res = walrcv_exec(wrconn, cmd.data, lengthof(attrRow), attrRow);
+	res = walrcv_exec(lrep_worker_wrconn, cmd.data, lengthof(attrRow), attrRow);
 
 	if (res->status != WALRCV_OK_TUPLES)
 		ereport(ERROR,
@@ -841,7 +842,7 @@ copy_table(Relation rel)
 		appendStringInfo(&cmd, " FROM %s) TO STDOUT",
 						 quote_qualified_identifier(lrel.nspname, lrel.relname));
 	}
-	res = walrcv_exec(wrconn, cmd.data, 0, NULL);
+	res = walrcv_exec(lrep_worker_wrconn, cmd.data, 0, NULL);
 	pfree(cmd.data);
 	if (res->status != WALRCV_OK_COPY_OUT)
 		ereport(ERROR,
@@ -957,8 +958,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	 * application_name, so that it is different from the main apply worker,
 	 * so that synchronous replication can distinguish them.
 	 */
-	wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err);
-	if (wrconn == NULL)
+	lrep_worker_wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err);
+	if (lrep_worker_wrconn == NULL)
 		ereport(ERROR,
 				(errmsg("could not connect to the publisher: %s", err)));
 
@@ -985,7 +986,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 		 * breakdown then it wouldn't have succeeded so trying it next time
 		 * seems like a better bet.
 		 */
-		ReplicationSlotDropAtPubNode(wrconn, slotname, true);
+		ReplicationSlotDropAtPubNode(lrep_worker_wrconn, slotname, true);
 	}
 	else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
 	{
@@ -1038,7 +1039,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	 * ensures that both the replication slot we create (see below) and the
 	 * COPY are consistent with each other.
 	 */
-	res = walrcv_exec(wrconn,
+	res = walrcv_exec(lrep_worker_wrconn,
 					  "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
 					  0, NULL);
 	if (res->status != WALRCV_OK_COMMAND)
@@ -1058,7 +1059,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	 * slot leading to a dangling slot on the server.
 	 */
 	HOLD_INTERRUPTS();
-	walrcv_create_slot(wrconn, slotname, false /* permanent */ ,
+	walrcv_create_slot(lrep_worker_wrconn, slotname, false /* permanent */ ,
 					   CRS_USE_SNAPSHOT, origin_startpos);
 	RESUME_INTERRUPTS();
 
@@ -1100,7 +1101,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	copy_table(rel);
 	PopActiveSnapshot();
 
-	res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
+	res = walrcv_exec(lrep_worker_wrconn, "COMMIT", 0, NULL);
 	if (res->status != WALRCV_OK_COMMAND)
 		ereport(ERROR,
 				(errmsg("table copy could not finish transaction on publisher: %s",
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index d9f1571..181b716 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -156,7 +156,7 @@ MemoryContext ApplyContext = NULL;
 /* per stream context for streaming transactions */
 static MemoryContext LogicalStreamingContext = NULL;
 
-WalReceiverConn *wrconn = NULL;
+WalReceiverConn *lrep_worker_wrconn = NULL;
 
 Subscription *MySubscription = NULL;
 bool		MySubscriptionValid = false;
@@ -2126,7 +2126,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
 		MemoryContextSwitchTo(ApplyMessageContext);
 
-		len = walrcv_receive(wrconn, &buf, &fd);
+		len = walrcv_receive(lrep_worker_wrconn, &buf, &fd);
 
 		if (len != 0)
 		{
@@ -2206,7 +2206,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 					MemoryContextReset(ApplyMessageContext);
 				}
 
-				len = walrcv_receive(wrconn, &buf, &fd);
+				len = walrcv_receive(lrep_worker_wrconn, &buf, &fd);
 			}
 		}
 
@@ -2312,7 +2312,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 	}
 
 	/* All done */
-	walrcv_endstreaming(wrconn, &tli);
+	walrcv_endstreaming(lrep_worker_wrconn, &tli);
 }
 
 /*
@@ -2396,7 +2396,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
 		 LSN_FORMAT_ARGS(writepos),
 		 LSN_FORMAT_ARGS(flushpos));
 
-	walrcv_send(wrconn, reply_message->data, reply_message->len);
+	walrcv_send(lrep_worker_wrconn, reply_message->data, reply_message->len);
 
 	if (recvpos > last_recvpos)
 		last_recvpos = recvpos;
@@ -3090,9 +3090,9 @@ ApplyWorkerMain(Datum main_arg)
 		origin_startpos = replorigin_session_get_progress(false);
 		CommitTransactionCommand();
 
-		wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name,
+		lrep_worker_wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name,
 								&err);
-		if (wrconn == NULL)
+		if (lrep_worker_wrconn == NULL)
 			ereport(ERROR,
 					(errmsg("could not connect to the publisher: %s", err)));
 
@@ -3100,7 +3100,7 @@ ApplyWorkerMain(Datum main_arg)
 		 * We don't really use the output identify_system for anything but it
 		 * does some initializations on the upstream so let's still call it.
 		 */
-		(void) walrcv_identify_system(wrconn, &startpointTLI);
+		(void) walrcv_identify_system(lrep_worker_wrconn, &startpointTLI);
 	}
 
 	/*
@@ -3116,14 +3116,14 @@ ApplyWorkerMain(Datum main_arg)
 	options.startpoint = origin_startpos;
 	options.slotname = myslotname;
 	options.proto.logical.proto_version =
-		walrcv_server_version(wrconn) >= 140000 ?
+		walrcv_server_version(lrep_worker_wrconn) >= 140000 ?
 		LOGICALREP_PROTO_STREAM_VERSION_NUM : LOGICALREP_PROTO_VERSION_NUM;
 	options.proto.logical.publication_names = MySubscription->publications;
 	options.proto.logical.binary = MySubscription->binary;
 	options.proto.logical.streaming = MySubscription->stream;
 
 	/* Start normal logical streaming replication. */
-	walrcv_startstreaming(wrconn, &options);
+	walrcv_startstreaming(lrep_worker_wrconn, &options);
 
 	/* Run the main loop. */
 	LogicalRepApplyLoop(origin_startpos);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 1cac75e..9209991 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -62,7 +62,7 @@ typedef struct LogicalRepWorker
 extern MemoryContext ApplyContext;
 
 /* libpqreceiver connection */
-extern struct WalReceiverConn *wrconn;
+extern struct WalReceiverConn *lrep_worker_wrconn;
 
 /* Worker and subscription objects. */
 extern Subscription *MySubscription;
-- 
1.8.3.1

