From 356e6ff2f38775a14e7ec8c60507c520f5c32bae Mon Sep 17 00:00:00 2001
From: Melih Mutlu <m.melihmutlu@gmail.com>
Date: Tue, 4 Jul 2023 22:13:52 +0300
Subject: [PATCH v21 3/5] Reuse connection when tablesync workers change the
 target

Previously tablesync workers establish new connections when it changes the syncing
table, but this might have additional overhead. This patch allows the existing
connection to be reused.

As for the publisher node, this patch allows to reuse logical walsender processes
after the streaming is done once.
---
 src/backend/replication/logical/launcher.c  |  1 +
 src/backend/replication/logical/tablesync.c | 60 ++++++++++++++-------
 src/backend/replication/logical/worker.c    | 18 ++++---
 src/backend/replication/walsender.c         |  7 +++
 src/include/replication/worker_internal.h   |  3 ++
 5 files changed, 62 insertions(+), 27 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 25dd06b8af..657e446eaf 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -441,6 +441,7 @@ retry:
 	worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
 	worker->parallel_apply = is_parallel_apply_worker;
 	worker->relsync_completed = false;
+	worker->slotnum = slot;
 	worker->last_lsn = InvalidXLogRecPtr;
 	TIMESTAMP_NOBEGIN(worker->last_send_time);
 	TIMESTAMP_NOBEGIN(worker->last_recv_time);
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 3eaa47119a..cdddff1923 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -151,16 +151,6 @@ finish_sync_worker(bool reuse_worker)
 		pgstat_report_stat(true);
 	}
 
-	/*
-	 * Disconnect from the publisher otherwise reusing the sync worker can
-	 * error due to exceeding max_wal_senders.
-	 */
-	if (LogRepWorkerWalRcvConn != NULL)
-	{
-		walrcv_disconnect(LogRepWorkerWalRcvConn);
-		LogRepWorkerWalRcvConn = NULL;
-	}
-
 	/* And flush all writes. */
 	XLogFlush(GetXLogWriteRecPtr());
 
@@ -1268,6 +1258,27 @@ ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
 			 relid, GetSystemIdentifier());
 }
 
+/*
+ * Determine the application_name for tablesync workers.
+ *
+ * Previously, the replication slot name was used as application_name. Since
+ * it's possible to reuse tablesync workers now, a tablesync worker can handle
+ * several different replication slots during its lifetime. Therefore, we
+ * cannot use the slot name as application_name anymore. Instead, the slot
+ * number of the tablesync worker is used as a part of the application_name.
+ *
+ * FIXME: if the tablesync worker starts to reuse the replication slot during
+ * synchronization, we should again use the replication slot name as
+ * application_name.
+ */
+static void
+ApplicationNameForTablesync(Oid suboid, int worker_slot,
+							char *application_name, Size szapp)
+{
+	snprintf(application_name, szapp, "pg_%u_sync_%i_" UINT64_FORMAT, suboid,
+			 worker_slot, GetSystemIdentifier());
+}
+
 /*
  * Start syncing the table in the sync worker.
  *
@@ -1329,15 +1340,26 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 									slotname,
 									NAMEDATALEN);
 
-	/*
-	 * Here we use the slot name instead of the subscription name as the
-	 * application_name, so that it is different from the leader apply worker,
-	 * so that synchronous replication can distinguish them.
-	 */
-	LogRepWorkerWalRcvConn =
-		walrcv_connect(MySubscription->conninfo, true,
-					   must_use_password,
-					   slotname, &err);
+	/* Connect to the publisher if haven't done so already. */
+	if (LogRepWorkerWalRcvConn == NULL)
+	{
+		char application_name[NAMEDATALEN];
+
+		/*
+		 * The application_name must differ from the subscription name (used by
+		 * the leader apply worker) because synchronous replication has to be
+		 * able to distinguish this worker from the leader apply worker.
+		 */
+		ApplicationNameForTablesync(MySubscription->oid,
+									MyLogicalRepWorker->slotnum,
+									application_name,
+									NAMEDATALEN);
+		LogRepWorkerWalRcvConn =
+			walrcv_connect(MySubscription->conninfo, true,
+						   must_use_password,
+						   application_name, &err);
+	}
+
 	if (LogRepWorkerWalRcvConn == NULL)
 		ereport(ERROR,
 				(errcode(ERRCODE_CONNECTION_FAILURE),
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 7442fd308d..9338c51248 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3480,20 +3480,22 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 	ErrorContextCallback errcallback;
 
 	/*
-	 * Init the ApplyMessageContext which we clean up after each replication
-	 * protocol message.
+	 * Init the ApplyMessageContext if needed. This context is cleaned up
+	 * after each replication protocol message.
 	 */
-	ApplyMessageContext = AllocSetContextCreate(ApplyContext,
-												"ApplyMessageContext",
-												ALLOCSET_DEFAULT_SIZES);
+	if (!ApplyMessageContext)
+		ApplyMessageContext = AllocSetContextCreate(ApplyContext,
+													"ApplyMessageContext",
+													ALLOCSET_DEFAULT_SIZES);
 
 	/*
 	 * This memory context is used for per-stream data when the streaming mode
 	 * is enabled. This context is reset on each stream stop.
 	 */
-	LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
-													"LogicalStreamingContext",
-													ALLOCSET_DEFAULT_SIZES);
+	if (!LogicalStreamingContext)
+		LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
+														"LogicalStreamingContext",
+														ALLOCSET_DEFAULT_SIZES);
 
 	/* mark as idle, before starting to loop */
 	pgstat_report_activity(STATE_IDLE, NULL);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index d27ef2985d..2f3e93cc40 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1830,7 +1830,14 @@ exec_replication_command(const char *cmd_string)
 				if (cmd->kind == REPLICATION_KIND_PHYSICAL)
 					StartReplication(cmd);
 				else
+				{
+					/*
+					 * Reset flags because reusing tablesync workers can mean
+					 * this is the second time here.
+					 */
+					streamingDoneSending = streamingDoneReceiving = false;
 					StartLogicalReplication(cmd);
+				}
 
 				/* dupe, but necessary per libpqrcv_endstreaming */
 				EndReplicationCommand(cmdtag);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index f552ecbc09..4a94b13423 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -39,6 +39,9 @@ typedef struct LogicalRepWorker
 	/* Increased every time the slot is taken by new worker. */
 	uint16		generation;
 
+	/* Slot number of this worker. */
+	int			slotnum;
+
 	/* Pointer to proc array. NULL if not running. */
 	PGPROC	   *proc;
 
-- 
2.25.1

