From e99aaebd567db17813fe06b2c4384bde18c45ef7 Mon Sep 17 00:00:00 2001
From: sherlockcpp <sherlockcpp@foxmail.com>
Date: Sat, 17 Dec 2022 20:43:21 +0800
Subject: [PATCH v79 2/4] Stop extra worker if GUC was changed

If the max_parallel_apply_workers_per_subscription is changed to a
lower value, try to stop free workers in the pool to keep the number of
workers lower than half of the max_parallel_apply_workers_per_subscription
---
 .../replication/logical/applyparallelworker.c      | 60 ++++++++++++++++++----
 src/backend/replication/logical/worker.c           |  7 +++
 src/include/replication/worker_internal.h          |  1 +
 3 files changed, 57 insertions(+), 11 deletions(-)

diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index a11b27e..4fa2b62 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -543,6 +543,25 @@ pa_find_worker(TransactionId xid)
 }
 
 /*
+ * Stop the given parallel apply worker and free the corresponding info.
+ */
+static void
+pa_stop_worker(ParallelApplyWorkerInfo *winfo)
+{
+	int			slot_no;
+	uint16		generation;
+
+	SpinLockAcquire(&winfo->shared->mutex);
+	generation = winfo->shared->logicalrep_worker_generation;
+	slot_no = winfo->shared->logicalrep_worker_slot_no;
+	SpinLockRelease(&winfo->shared->mutex);
+
+	logicalrep_pa_worker_stop(slot_no, generation);
+
+	pa_free_worker_info(winfo);
+}
+
+/*
  * Makes the worker available for reuse.
  *
  * This removes the parallel apply worker entry from the hash table so that it
@@ -577,23 +596,42 @@ pa_free_worker(ParallelApplyWorkerInfo *winfo)
 		list_length(ParallelApplyWorkerPool) >
 		(max_parallel_apply_workers_per_subscription / 2))
 	{
-		int			slot_no;
-		uint16		generation;
-
-		SpinLockAcquire(&winfo->shared->mutex);
-		generation = winfo->shared->logicalrep_worker_generation;
-		slot_no = winfo->shared->logicalrep_worker_slot_no;
-		SpinLockRelease(&winfo->shared->mutex);
+		pa_stop_worker(winfo);
+		return;
+	}
 
-		logicalrep_pa_worker_stop(slot_no, generation);
+	winfo->in_use = false;
+	winfo->serialize_changes = false;
+}
 
-		pa_free_worker_info(winfo);
+/*
+ * Try to stop parallel apply workers that are not in use to keep the number of
+ * workers lower than half of the max_parallel_apply_workers_per_subscription.
+ */
+void
+pa_stop_idle_workers(void)
+{
+	List	   *active_workers;
+	ListCell   *lc;
+	int			max_applyworkers = max_parallel_apply_workers_per_subscription / 2;
 
+	if (list_length(ParallelApplyWorkerPool) <= max_applyworkers)
 		return;
+
+	active_workers = list_copy(ParallelApplyWorkerPool);
+
+	foreach(lc, active_workers)
+	{
+		ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
+
+		pa_stop_worker(winfo);
+
+		/* Recheck the number of workers. */
+		if (list_length(ParallelApplyWorkerPool) <= max_applyworkers)
+			break;
 	}
 
-	winfo->in_use = false;
-	winfo->serialize_changes = false;
+	list_free(active_workers);
 }
 
 /*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 79cda39..c7be76d 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3630,6 +3630,13 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 		{
 			ConfigReloadPending = false;
 			ProcessConfigFile(PGC_SIGHUP);
+
+			/*
+			 * Try to stop free workers in the pool in case the
+			 * max_parallel_apply_workers_per_subscription is changed to a
+			 * lower value.
+			 */
+			pa_stop_idle_workers();
 		}
 
 		if (rc & WL_TIMEOUT)
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index dc87a4e..34e5006 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -274,6 +274,7 @@ extern void set_apply_error_context_origin(char *originname);
 /* Parallel apply worker setup and interactions */
 extern void pa_allocate_worker(TransactionId xid);
 extern ParallelApplyWorkerInfo *pa_find_worker(TransactionId xid);
+extern void pa_stop_idle_workers(void);
 extern void pa_detach_all_error_mq(void);
 
 extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes,
-- 
2.7.2.windows.1

