From 2c0433119820216708a5076f2ba90c1436f782df Mon Sep 17 00:00:00 2001
From: Shveta Malik <shveta.malik@gmail.com>
Date: Fri, 5 Apr 2024 10:16:03 +0530
Subject: [PATCH v4] Handle stopSignaled during sync function call.

Currently, promotion related handling is missing in the slot sync SQL
function pg_sync_replication_slots(). Here is the background on how
it is done in slot sync worker:
During promotion, the startup process in order to shut down the
slot-sync worker, sets the 'stopSignaled' flag, sends the shut-down
signal, and waits for slot sync worker to exit. Meanwhile if the
postmaster has not noticed the promotion yet, it may end up restarting
slot sync worker. In such a case, the worker exits if 'stopSignaled'
is set.

Since there is a chance that the user (or any of his scripts/tools)
may execute SQL function pg_sync_replication_slots() in parallel to
promotion, such handling is needed in this SQL function as well, The
attached patch attempts to implement the same. Changes are:

1) If pg_sync_replication_slots() is already running when the
promotion is triggered, ShutDownSlotSync() checks the
'SlotSyncCtx->syncing' flag as well and waits for it to become false
i.e. waits till parallel running SQL function is finished.

2) If pg_sync_replication_slots() is invoked when promotion is
already in progress, pg_sync_replication_slots() respects the
'stopSignaled' flag set by the startup process and becomes a no-op.
---
 src/backend/replication/logical/slotsync.c | 196 +++++++++++++--------
 1 file changed, 122 insertions(+), 74 deletions(-)

diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index bda0de52db..a0e0c05d20 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -92,9 +92,6 @@
  * is expected (e.g., slot sync GUCs change), slot sync worker will reset
  * last_start_time before exiting, so that postmaster can start the worker
  * without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
- *
- * All the fields except 'syncing' are used only by slotsync worker.
- * 'syncing' is used both by worker and SQL function pg_sync_replication_slots.
  */
 typedef struct SlotSyncCtxStruct
 {
@@ -807,20 +804,6 @@ synchronize_slots(WalReceiverConn *wrconn)
 		" FROM pg_catalog.pg_replication_slots"
 		" WHERE failover and NOT temporary";
 
-	SpinLockAcquire(&SlotSyncCtx->mutex);
-	if (SlotSyncCtx->syncing)
-	{
-		SpinLockRelease(&SlotSyncCtx->mutex);
-		ereport(ERROR,
-				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-				errmsg("cannot synchronize replication slots concurrently"));
-	}
-
-	SlotSyncCtx->syncing = true;
-	SpinLockRelease(&SlotSyncCtx->mutex);
-
-	syncing_slots = true;
-
 	/* The syscache access in walrcv_exec() needs a transaction env. */
 	if (!IsTransactionState())
 	{
@@ -937,12 +920,6 @@ synchronize_slots(WalReceiverConn *wrconn)
 	if (started_tx)
 		CommitTransactionCommand();
 
-	SpinLockAcquire(&SlotSyncCtx->mutex);
-	SlotSyncCtx->syncing = false;
-	SpinLockRelease(&SlotSyncCtx->mutex);
-
-	syncing_slots = false;
-
 	return some_slot_updated;
 }
 
@@ -1199,7 +1176,20 @@ static void
 slotsync_worker_onexit(int code, Datum arg)
 {
 	SpinLockAcquire(&SlotSyncCtx->mutex);
+
 	SlotSyncCtx->pid = InvalidPid;
+
+	/*
+	 * If syncing_slots is true, it indicates that the process errored out
+	 * without resetting the flag. So, we need to clean up shared memory and
+	 * reset the flag here.
+	 */
+	if (syncing_slots)
+	{
+		SlotSyncCtx->syncing = false;
+		syncing_slots = false;
+	}
+
 	SpinLockRelease(&SlotSyncCtx->mutex);
 }
 
@@ -1242,6 +1232,63 @@ wait_for_slot_activity(bool some_slot_updated)
 		ResetLatch(MyLatch);
 }
 
+/*
+ * Check stopSignaled and syncing flags. Emit error if promotion has
+ * already set stopSignaled or if it is concurrent sync call.
+ * Otherwise, set 'syncing' flag and pid info.
+ */
+static void
+check_flags_and_set_sync_info(pid_t worker_pid)
+{
+	SpinLockAcquire(&SlotSyncCtx->mutex);
+
+	/* If it is worker, check that we do not have pid set already */
+	if (worker_pid != InvalidPid)
+		Assert(SlotSyncCtx->pid == InvalidPid);
+
+	/*
+	 * Startup process signaled the slot sync machinery to stop, so if meanwhile
+	 * postmaster ended up starting the worker again or user has invoked
+	 * pg_sync_replication_slots(), error out.
+	 */
+	if (SlotSyncCtx->stopSignaled)
+	{
+		SpinLockRelease(&SlotSyncCtx->mutex);
+		ereport(ERROR,
+				errmsg("promotion in progress, can not synchronize replication slots"));
+	}
+
+	if (SlotSyncCtx->syncing)
+	{
+		SpinLockRelease(&SlotSyncCtx->mutex);
+		ereport(ERROR,
+				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("cannot synchronize replication slots concurrently"));
+	}
+
+	/* Advertise our PID so that the startup process can kill us on promotion */
+	SlotSyncCtx->pid = worker_pid;
+
+	SlotSyncCtx->syncing = true;
+
+	SpinLockRelease(&SlotSyncCtx->mutex);
+
+	syncing_slots = true;
+}
+
+/*
+ * Reset syncing flag.
+ */
+static void
+reset_syncing_flag()
+{
+	SpinLockAcquire(&SlotSyncCtx->mutex);
+	SlotSyncCtx->syncing = false;
+	SpinLockRelease(&SlotSyncCtx->mutex);
+
+	syncing_slots = false;
+};
+
 /*
  * The main loop of our worker process.
  *
@@ -1278,47 +1325,6 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
 
 	Assert(SlotSyncCtx != NULL);
 
-	SpinLockAcquire(&SlotSyncCtx->mutex);
-	Assert(SlotSyncCtx->pid == InvalidPid);
-
-	/*
-	 * Startup process signaled the slot sync worker to stop, so if meanwhile
-	 * postmaster ended up starting the worker again, exit.
-	 */
-	if (SlotSyncCtx->stopSignaled)
-	{
-		SpinLockRelease(&SlotSyncCtx->mutex);
-		proc_exit(0);
-	}
-
-	/* Advertise our PID so that the startup process can kill us on promotion */
-	SlotSyncCtx->pid = MyProcPid;
-	SpinLockRelease(&SlotSyncCtx->mutex);
-
-	ereport(LOG, errmsg("slot sync worker started"));
-
-	/* Register it as soon as SlotSyncCtx->pid is initialized. */
-	before_shmem_exit(slotsync_worker_onexit, (Datum) 0);
-
-	/* Setup signal handling */
-	pqsignal(SIGHUP, SignalHandlerForConfigReload);
-	pqsignal(SIGINT, SignalHandlerForShutdownRequest);
-	pqsignal(SIGTERM, die);
-	pqsignal(SIGFPE, FloatExceptionHandler);
-	pqsignal(SIGUSR1, procsignal_sigusr1_handler);
-	pqsignal(SIGUSR2, SIG_IGN);
-	pqsignal(SIGPIPE, SIG_IGN);
-	pqsignal(SIGCHLD, SIG_DFL);
-
-	/*
-	 * Establishes SIGALRM handler and initialize timeout module. It is needed
-	 * by InitPostgres to register different timeouts.
-	 */
-	InitializeTimeouts();
-
-	/* Load the libpq-specific functions */
-	load_file("libpqwalreceiver", false);
-
 	/*
 	 * If an exception is encountered, processing resumes here.
 	 *
@@ -1350,6 +1356,32 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
 	/* We can now handle ereport(ERROR) */
 	PG_exception_stack = &local_sigjmp_buf;
 
+	check_flags_and_set_sync_info(MyProcPid);
+
+	ereport(LOG, errmsg("slot sync worker started"));
+
+	/* Register it as soon as SlotSyncCtx->pid is initialized. */
+	before_shmem_exit(slotsync_worker_onexit, (Datum) 0);
+
+	/* Setup signal handling */
+	pqsignal(SIGHUP, SignalHandlerForConfigReload);
+	pqsignal(SIGINT, SignalHandlerForShutdownRequest);
+	pqsignal(SIGTERM, die);
+	pqsignal(SIGFPE, FloatExceptionHandler);
+	pqsignal(SIGUSR1, procsignal_sigusr1_handler);
+	pqsignal(SIGUSR2, SIG_IGN);
+	pqsignal(SIGPIPE, SIG_IGN);
+	pqsignal(SIGCHLD, SIG_DFL);
+
+	/*
+	 * Establishes SIGALRM handler and initialize timeout module. It is needed
+	 * by InitPostgres to register different timeouts.
+	 */
+	InitializeTimeouts();
+
+	/* Load the libpq-specific functions */
+	load_file("libpqwalreceiver", false);
+
 	/*
 	 * Unblock signals (they were blocked when the postmaster forked us)
 	 */
@@ -1471,6 +1503,9 @@ update_synced_slots_inactive_since(void)
 		{
 			Assert(SlotIsLogical(s));
 
+			/* The slot must not be acquired by any process */
+			Assert(s->active_pid == 0);
+
 			/* Use the same inactive_since time for all the slots. */
 			if (now == 0)
 				now = GetCurrentTimestamp();
@@ -1486,6 +1521,10 @@ update_synced_slots_inactive_since(void)
 
 /*
  * Shut down the slot sync worker.
+ *
+ * It sends signal to shutdown slot sync worker. It also waits till
+ * the slot sync worker has exited and pg_sync_replication_slots()
+ * has finished.
  */
 void
 ShutDownSlotSync(void)
@@ -1494,7 +1533,11 @@ ShutDownSlotSync(void)
 
 	SlotSyncCtx->stopSignaled = true;
 
-	if (SlotSyncCtx->pid == InvalidPid)
+	/*
+	 * Return if neither the slot sync worker is running nor the function
+	 * pg_sync_replication_slots() is executing.
+	 */
+	if (!SlotSyncCtx->syncing)
 	{
 		SpinLockRelease(&SlotSyncCtx->mutex);
 		update_synced_slots_inactive_since();
@@ -1502,9 +1545,10 @@ ShutDownSlotSync(void)
 	}
 	SpinLockRelease(&SlotSyncCtx->mutex);
 
-	kill(SlotSyncCtx->pid, SIGINT);
+	if (SlotSyncCtx->pid != InvalidPid)
+		kill(SlotSyncCtx->pid, SIGINT);
 
-	/* Wait for it to die */
+	/* Wait for slot sync to end */
 	for (;;)
 	{
 		int			rc;
@@ -1522,8 +1566,11 @@ ShutDownSlotSync(void)
 
 		SpinLockAcquire(&SlotSyncCtx->mutex);
 
-		/* Is it gone? */
-		if (SlotSyncCtx->pid == InvalidPid)
+		/*
+		 * Confirm that both the worker and the function
+		 * pg_sync_replication_slots() are done.
+		 */
+		if (!SlotSyncCtx->syncing)
 			break;
 
 		SpinLockRelease(&SlotSyncCtx->mutex);
@@ -1615,11 +1662,7 @@ slotsync_failure_callback(int code, Datum arg)
 		 * without resetting the flag. So, we need to clean up shared memory
 		 * and reset the flag here.
 		 */
-		SpinLockAcquire(&SlotSyncCtx->mutex);
-		SlotSyncCtx->syncing = false;
-		SpinLockRelease(&SlotSyncCtx->mutex);
-
-		syncing_slots = false;
+		reset_syncing_flag();
 	}
 
 	walrcv_disconnect(wrconn);
@@ -1634,9 +1677,14 @@ SyncReplicationSlots(WalReceiverConn *wrconn)
 {
 	PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
 	{
+		check_flags_and_set_sync_info(InvalidPid);
+
 		validate_remote_info(wrconn);
 
 		synchronize_slots(wrconn);
+
+		/* We are done with sync, so reset sync flag */
+		reset_syncing_flag();
 	}
 	PG_END_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
 }
-- 
2.34.1

