From 60db4932687bffd16e291638256d844a6ddc12d0 Mon Sep 17 00:00:00 2001
From: Melih Mutlu <m.melihmutlu@gmail.com>
Date: Tue, 4 Jul 2023 22:04:46 +0300
Subject: [PATCH v21 2/5] Reuse Tablesync Workers

Before this patch, tablesync workers were capable of syncing only one
table. For each table, a new sync worker was launched and that worker would
exit when done processing the table.

Now, tablesync workers are not limited to processing only one
table. When done, they can move to processing another table in
the same subscription.

If there is a table that needs to be synced, an available tablesync
worker picks up that table and syncs it. Each tablesync worker
continues to pick new tables to sync until there are no tables left
requiring synchronization. If there was no available worker to
process the table, then a new tablesync worker will be launched,
provided the number of tablesync workers for the subscription does not
exceed max_sync_workers_per_subscription.

Discussion: http://postgr.es/m/CAGPVpCTq=rUDd4JUdaRc1XUWf4BrH2gdSNf3rtOMUGj9rPpfzQ@mail.gmail.com
---
 src/backend/replication/logical/launcher.c  |   1 +
 src/backend/replication/logical/tablesync.c | 124 ++++++++++++++++++--
 src/backend/replication/logical/worker.c    |  38 +++++-
 src/include/replication/worker_internal.h   |   6 +
 4 files changed, 153 insertions(+), 16 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index e231fa7f95..25dd06b8af 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -440,6 +440,7 @@ retry:
 	worker->stream_fileset = NULL;
 	worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
 	worker->parallel_apply = is_parallel_apply_worker;
+	worker->relsync_completed = false;
 	worker->last_lsn = InvalidXLogRecPtr;
 	TIMESTAMP_NOBEGIN(worker->last_send_time);
 	TIMESTAMP_NOBEGIN(worker->last_recv_time);
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 729f48a3b5..3eaa47119a 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -134,10 +134,12 @@ static StringInfo copybuf = NULL;
 
 /*
  * Exit routine for synchronization worker.
+ *
+ * If reuse_worker is false, at the conclusion of this function the worker
+ * process will exit.
  */
 static void
-pg_attribute_noreturn()
-finish_sync_worker(void)
+finish_sync_worker(bool reuse_worker)
 {
 	/*
 	 * Commit any outstanding transaction. This is the usual case, unless
@@ -149,21 +151,42 @@ finish_sync_worker(void)
 		pgstat_report_stat(true);
 	}
 
+	/*
+	 * Disconnect from the publisher otherwise reusing the sync worker can
+	 * error due to exceeding max_wal_senders.
+	 */
+	if (LogRepWorkerWalRcvConn != NULL)
+	{
+		walrcv_disconnect(LogRepWorkerWalRcvConn);
+		LogRepWorkerWalRcvConn = NULL;
+	}
+
 	/* And flush all writes. */
 	XLogFlush(GetXLogWriteRecPtr());
 
 	StartTransactionCommand();
-	ereport(LOG,
-			(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
-					MySubscription->name,
-					get_rel_name(MyLogicalRepWorker->relid))));
+	if (!reuse_worker)
+	{
+		ereport(LOG,
+				(errmsg("logical replication table synchronization worker for subscription \"%s\" has finished",
+						MySubscription->name)));
+	}
+	else
+	{
+		ereport(LOG,
+				(errmsg("logical replication worker for subscription \"%s\" will be reused to sync table \"%s\" with relid %u.",
+						MySubscription->name,
+						get_rel_name(MyLogicalRepWorker->relid),
+						MyLogicalRepWorker->relid)));
+	}
 	CommitTransactionCommand();
 
 	/* Find the leader apply worker and signal it. */
 	logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
 
 	/* Stop gracefully */
-	proc_exit(0);
+	if (!reuse_worker)
+		proc_exit(0);
 }
 
 /*
@@ -383,7 +406,15 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 		 */
 		replorigin_drop_by_name(originname, true, false);
 
-		finish_sync_worker();
+		/* Sync worker has completed synchronization of the current table. */
+		MyLogicalRepWorker->relsync_completed = true;
+
+		ereport(LOG,
+				(errmsg("logical replication table synchronization for subscription \"%s\", relation \"%s\" with relid %u has finished",
+						MySubscription->name,
+						get_rel_name(MyLogicalRepWorker->relid),
+						MyLogicalRepWorker->relid)));
+		CommitTransactionCommand();
 	}
 	else
 		SpinLockRelease(&MyLogicalRepWorker->relmutex);
@@ -1288,7 +1319,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 		case SUBREL_STATE_SYNCDONE:
 		case SUBREL_STATE_READY:
 		case SUBREL_STATE_UNKNOWN:
-			finish_sync_worker();	/* doesn't return */
+			finish_sync_worker(false);	/* doesn't return */
 	}
 
 	/* Calculate the name of the tablesync slot. */
@@ -1645,6 +1676,9 @@ run_tablesync_worker(WalRcvStreamOptions *options,
 					 int originname_size,
 					 XLogRecPtr *origin_startpos)
 {
+	MyLogicalRepWorker->relsync_completed = false;
+
+	/* Start table synchronization. */
 	start_table_sync(origin_startpos, &slotname);
 
 	ReplicationOriginNameForLogicalRep(MySubscription->oid,
@@ -1671,13 +1705,79 @@ TablesyncWorkerMain(Datum main_arg)
 	XLogRecPtr	origin_startpos = InvalidXLogRecPtr;
 	char	   *myslotname = NULL;
 	WalRcvStreamOptions options;
+	bool 		done = false;
 
 	StartLogRepWorker(worker_slot);
 
-	run_tablesync_worker(&options, myslotname, originname,
-						 sizeof(originname), &origin_startpos);
+	/*
+	 * The loop where worker does its job. It loops until there is no relation
+	 * left to sync.
+	 */
+	for (;!done;)
+	{
+		List	   *rstates;
+		ListCell   *lc;
+
+		run_tablesync_worker(&options, myslotname, originname,
+							sizeof(originname), &origin_startpos);
+
+		if (IsTransactionState())
+			CommitTransactionCommand();
+
+		if (MyLogicalRepWorker->relsync_completed)
+		{
+			/*
+			 * This tablesync worker is 'done' unless another table that needs
+			 * syncing is found.
+			 */
+			done = true;
+
+			/* This transaction will be committed by finish_sync_worker. */
+			StartTransactionCommand();
+
+			/*
+			 * Check if there is any table whose relation state is still INIT.
+			 * If a table in INIT state is found, the worker will not be
+			 * finished, it will be reused instead.
+			 */
+			rstates = GetSubscriptionRelations(MySubscription->oid, true);
+
+			foreach(lc, rstates)
+			{
+				SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
+
+				if (rstate->state == SUBREL_STATE_SYNCDONE)
+					continue;
+
+				/*
+				 * Take exclusive lock to prevent any other sync worker from
+				 * picking the same table.
+				 */
+				LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
+
+				/*
+				 * Pick the table for the next run if it is not already picked
+				 * up by another worker.
+				 */
+				if (!logicalrep_worker_find(MySubscription->oid, rstate->relid, false))
+				{
+					/* Update worker state for the next table */
+					MyLogicalRepWorker->relid = rstate->relid;
+					MyLogicalRepWorker->relstate = rstate->state;
+					MyLogicalRepWorker->relstate_lsn = rstate->lsn;
+					LWLockRelease(LogicalRepWorkerLock);
+
+					/* Found a table for next iteration */
+					finish_sync_worker(true);
+					done = false;
+					break;
+				}
+				LWLockRelease(LogicalRepWorkerLock);
+			}
+		}
+	}
 
-	finish_sync_worker();
+	finish_sync_worker(false);
 }
 
 /*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 3b8976f717..7442fd308d 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3607,6 +3607,21 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 					MemoryContextReset(ApplyMessageContext);
 				}
 
+				/*
+				 * apply_dispatch() may have gone into apply_handle_commit()
+				 * which can call process_syncing_tables_for_sync.
+				 *
+				 * process_syncing_tables_for_sync decides whether the sync of
+				 * the current table is completed. If it is completed,
+				 * streaming must be already ended. So, we can break the loop.
+				 */
+				if (am_tablesync_worker() &&
+					MyLogicalRepWorker->relsync_completed)
+				{
+					endofstream = true;
+					break;
+				}
+
 				len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
 			}
 		}
@@ -3626,6 +3641,16 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
 			/* Process any table synchronization changes. */
 			process_syncing_tables(last_received);
+
+			/*
+			 * If relsync_completed is true, this means that the tablesync
+			 * worker is done with synchronization. Streaming has already been
+			 * ended by process_syncing_tables_for_sync. We should move to the
+			 * next table if needed, or exit.
+			 */
+			if (am_tablesync_worker() &&
+				MyLogicalRepWorker->relsync_completed)
+				endofstream = true;
 		}
 
 		/* Cleanup the memory. */
@@ -3728,8 +3753,12 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 	error_context_stack = errcallback.previous;
 	apply_error_context_stack = error_context_stack;
 
-	/* All done */
-	walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
+	/*
+	 * End streaming here for only apply workers. Ending streaming for
+	 * tablesync workers is deferred until the worker exits its main loop.
+	 */
+	if (!am_tablesync_worker())
+		walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
 }
 
 /*
@@ -4602,9 +4631,10 @@ InitializeLogRepWorker(void)
 
 	if (am_tablesync_worker())
 		ereport(LOG,
-				(errmsg("logical replication worker for subscription \"%s\", table \"%s\" has started",
+				(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" with relid %u has started",
 						MySubscription->name,
-						get_rel_name(MyLogicalRepWorker->relid))));
+						get_rel_name(MyLogicalRepWorker->relid),
+						MyLogicalRepWorker->relid)));
 	else
 		ereport(LOG,
 				(errmsg("logical replication apply worker for subscription \"%s\" has started",
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 9012af38cd..f552ecbc09 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -57,6 +57,12 @@ typedef struct LogicalRepWorker
 	XLogRecPtr	relstate_lsn;
 	slock_t		relmutex;
 
+	/*
+	 * Indicates whether tablesync worker has completed syncing its assigned
+	 * table.
+	 */
+	bool		relsync_completed;
+
 	/*
 	 * Used to create the changes and subxact files for the streaming
 	 * transactions.  Upon the arrival of the first streaming transaction or
-- 
2.25.1

