From 57402e74fac06e7c875e5f9b941cf893f3f8c6d2 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Fri, 11 Aug 2023 18:47:29 +1000
Subject: [PATCH v1] logicalrep_worker_launch limit checks

---
 src/backend/replication/logical/launcher.c | 39 ++++++++++++++++++++++--------
 1 file changed, 29 insertions(+), 10 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index e231fa7..0fd15f6 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -312,10 +312,11 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
 	int			i;
 	int			slot = 0;
 	LogicalRepWorker *worker = NULL;
-	int			nsyncworkers;
-	int			nparallelapplyworkers;
 	TimestampTz now;
+	bool		is_tablesync_worker = OidIsValid(relid);
 	bool		is_parallel_apply_worker = (subworker_dsm != DSM_HANDLE_INVALID);
+	bool		at_limit_of_tablesync_workers = false;
+	bool		at_limit_of_parallel_apply_workers = false;
 
 	/* Sanity check - tablesync worker cannot be a subworker */
 	Assert(!(is_parallel_apply_worker && OidIsValid(relid)));
@@ -350,7 +351,21 @@ retry:
 		}
 	}
 
-	nsyncworkers = logicalrep_sync_worker_count(subid);
+	/* Check if we are at the configured limit for the worker type */
+	if (is_tablesync_worker)
+	{
+		int n = logicalrep_sync_worker_count(subid);
+
+		if (n >= max_sync_workers_per_subscription)
+			at_limit_of_tablesync_workers = true;
+	}
+	else if (is_parallel_apply_worker)
+	{
+		int n = logicalrep_pa_worker_count(subid);
+
+		if (n >= max_parallel_apply_workers_per_subscription)
+			at_limit_of_parallel_apply_workers = true;
+	}
 
 	now = GetCurrentTimestamp();
 
@@ -358,8 +373,12 @@ retry:
 	 * If we didn't find a free slot, try to do garbage collection.  The
 	 * reason we do this is because if some worker failed to start up and its
 	 * parent has crashed while waiting, the in_use state was never cleared.
+	 *
+	 * If the worker type has reached its limit we also want to trigger garbage
+	 * collection. This is in case one of those workers was previously in error
+	 * and maybe now can be re-used.
 	 */
-	if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
+	if (worker == NULL || at_limit_of_tablesync_workers || at_limit_of_parallel_apply_workers)
 	{
 		bool		did_cleanup = false;
 
@@ -393,20 +412,17 @@ retry:
 	 * sync worker limit per subscription. So, just return silently as we
 	 * might get here because of an otherwise harmless race condition.
 	 */
-	if (OidIsValid(relid) && nsyncworkers >= max_sync_workers_per_subscription)
+	if (is_tablesync_worker && at_limit_of_tablesync_workers)
 	{
 		LWLockRelease(LogicalRepWorkerLock);
 		return false;
 	}
 
-	nparallelapplyworkers = logicalrep_pa_worker_count(subid);
-
 	/*
 	 * Return false if the number of parallel apply workers reached the limit
 	 * per subscription.
 	 */
-	if (is_parallel_apply_worker &&
-		nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
+	if (is_parallel_apply_worker && at_limit_of_parallel_apply_workers)
 	{
 		LWLockRelease(LogicalRepWorkerLock);
 		return false;
@@ -842,7 +858,10 @@ logicalrep_sync_worker_count(Oid subid)
 
 	Assert(LWLockHeldByMe(LogicalRepWorkerLock));
 
-	/* Search for attached worker for a given subscription id. */
+	/*
+	 * Scan all attached tablesync workers, only counting those which
+	 * have the given subscription id.
+	 */
 	for (i = 0; i < max_logical_replication_workers; i++)
 	{
 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
-- 
1.8.3.1

