From e3669efdf973a2c57c9eaba4b499d15fbb3116b8 Mon Sep 17 00:00:00 2001
From: Shveta Malik <shveta.malik@gmail.com>
Date: Fri, 5 Apr 2024 10:16:03 +0530
Subject: [PATCH v3] Handle stopSignaled during sync function call.

Currently, promotion related handling is missing in the slot sync SQL
function pg_sync_replication_slots(). Here is the background on how
it is done in slot sync worker:
During promotion, the startup process in order to shut down the
slot-sync worker, sets the 'stopSignaled' flag, sends the shut-down
signal, and waits for slot sync worker to exit. Meanwhile if the
postmaster has not noticed the promotion yet, it may end up restarting
slot sync worker. In such a case, the worker exits if 'stopSignaled'
is set.

Since there is a chance that the user (or any of his scripts/tools)
may execute SQL function pg_sync_replication_slots() in parallel to
promotion, such handling is needed in this SQL function as well, The
attached patch attempts to implement the same. Changes are:

1) If pg_sync_replication_slots() is already running when the
promotion is triggered, ShutDownSlotSync() checks the
'SlotSyncCtx->syncing' flag as well and waits for it to become false
i.e. waits till parallel running SQL function is finished.

2) If pg_sync_replication_slots() is invoked when promotion is
already in progress, pg_sync_replication_slots() respects the
'stopSignaled' flag set by the startup process and becomes a no-op.
---
 src/backend/replication/logical/slotsync.c | 120 +++++++++++++++------
 1 file changed, 90 insertions(+), 30 deletions(-)

diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index bda0de52db..aade737b73 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -807,20 +807,6 @@ synchronize_slots(WalReceiverConn *wrconn)
 		" FROM pg_catalog.pg_replication_slots"
 		" WHERE failover and NOT temporary";
 
-	SpinLockAcquire(&SlotSyncCtx->mutex);
-	if (SlotSyncCtx->syncing)
-	{
-		SpinLockRelease(&SlotSyncCtx->mutex);
-		ereport(ERROR,
-				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-				errmsg("cannot synchronize replication slots concurrently"));
-	}
-
-	SlotSyncCtx->syncing = true;
-	SpinLockRelease(&SlotSyncCtx->mutex);
-
-	syncing_slots = true;
-
 	/* The syscache access in walrcv_exec() needs a transaction env. */
 	if (!IsTransactionState())
 	{
@@ -937,12 +923,6 @@ synchronize_slots(WalReceiverConn *wrconn)
 	if (started_tx)
 		CommitTransactionCommand();
 
-	SpinLockAcquire(&SlotSyncCtx->mutex);
-	SlotSyncCtx->syncing = false;
-	SpinLockRelease(&SlotSyncCtx->mutex);
-
-	syncing_slots = false;
-
 	return some_slot_updated;
 }
 
@@ -1242,6 +1222,47 @@ wait_for_slot_activity(bool some_slot_updated)
 		ResetLatch(MyLatch);
 }
 
+/*
+ * Check syncing flag and error out if it is concurrent sync call.
+ * Otherwise, set syncing flag.
+ */
+static void
+check_and_set_syncing_flag(bool acquire_lock)
+{
+
+	/* Acquire spinLock if caller asked so */
+	if (acquire_lock)
+		SpinLockAcquire(&SlotSyncCtx->mutex);
+
+	if (SlotSyncCtx->syncing)
+	{
+		SpinLockRelease(&SlotSyncCtx->mutex);
+		ereport(ERROR,
+				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("cannot synchronize replication slots concurrently"));
+	}
+
+	SlotSyncCtx->syncing = true;
+
+	if (acquire_lock)
+		SpinLockRelease(&SlotSyncCtx->mutex);
+
+	syncing_slots = true;
+}
+
+/*
+ * Reset syncing flag.
+ */
+static void
+reset_syncing_flag()
+{
+	SpinLockAcquire(&SlotSyncCtx->mutex);
+	SlotSyncCtx->syncing = false;
+	SpinLockRelease(&SlotSyncCtx->mutex);
+
+	syncing_slots = false;
+};
+
 /*
  * The main loop of our worker process.
  *
@@ -1424,8 +1445,12 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
 
 		ProcessSlotSyncInterrupts(wrconn);
 
+		check_and_set_syncing_flag(true);
+
 		some_slot_updated = synchronize_slots(wrconn);
 
+		reset_syncing_flag();
+
 		wait_for_slot_activity(some_slot_updated);
 	}
 
@@ -1471,6 +1496,9 @@ update_synced_slots_inactive_since(void)
 		{
 			Assert(SlotIsLogical(s));
 
+			/* The slot must not be acquired by any process */
+			Assert(s->active_pid == 0);
+
 			/* Use the same inactive_since time for all the slots. */
 			if (now == 0)
 				now = GetCurrentTimestamp();
@@ -1486,6 +1514,10 @@ update_synced_slots_inactive_since(void)
 
 /*
  * Shut down the slot sync worker.
+ *
+ * It sends signal to shutdown slot sync worker. It also waits till
+ * the slot sync worker has exited and pg_sync_replication_slots()
+ * has finished.
  */
 void
 ShutDownSlotSync(void)
@@ -1494,7 +1526,11 @@ ShutDownSlotSync(void)
 
 	SlotSyncCtx->stopSignaled = true;
 
-	if (SlotSyncCtx->pid == InvalidPid)
+	/*
+	 * Return if neither the slot sync worker is running nor the function
+	 * pg_sync_replication_slots() is executing.
+	 */
+	if ((SlotSyncCtx->pid == InvalidPid) && !SlotSyncCtx->syncing)
 	{
 		SpinLockRelease(&SlotSyncCtx->mutex);
 		update_synced_slots_inactive_since();
@@ -1502,9 +1538,10 @@ ShutDownSlotSync(void)
 	}
 	SpinLockRelease(&SlotSyncCtx->mutex);
 
-	kill(SlotSyncCtx->pid, SIGINT);
+	if (SlotSyncCtx->pid != InvalidPid)
+		kill(SlotSyncCtx->pid, SIGINT);
 
-	/* Wait for it to die */
+	/* Wait for worker to exit and SQL function to finish */
 	for (;;)
 	{
 		int			rc;
@@ -1522,8 +1559,11 @@ ShutDownSlotSync(void)
 
 		SpinLockAcquire(&SlotSyncCtx->mutex);
 
-		/* Is it gone? */
-		if (SlotSyncCtx->pid == InvalidPid)
+		/*
+		 * Confirm that both the worker and the function
+		 * pg_sync_replication_slots() are done.
+		 */
+		if ((SlotSyncCtx->pid == InvalidPid) && !SlotSyncCtx->syncing)
 			break;
 
 		SpinLockRelease(&SlotSyncCtx->mutex);
@@ -1615,11 +1655,7 @@ slotsync_failure_callback(int code, Datum arg)
 		 * without resetting the flag. So, we need to clean up shared memory
 		 * and reset the flag here.
 		 */
-		SpinLockAcquire(&SlotSyncCtx->mutex);
-		SlotSyncCtx->syncing = false;
-		SpinLockRelease(&SlotSyncCtx->mutex);
-
-		syncing_slots = false;
+		reset_syncing_flag();
 	}
 
 	walrcv_disconnect(wrconn);
@@ -1634,9 +1670,33 @@ SyncReplicationSlots(WalReceiverConn *wrconn)
 {
 	PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
 	{
+		SpinLockAcquire(&SlotSyncCtx->mutex);
+
+		/*
+		 * Startup process signaled the slot sync to stop, so if meanwhile
+		 * user has invoked slot sync SQL function, error out.
+		 */
+		if (SlotSyncCtx->stopSignaled)
+		{
+			SpinLockRelease(&SlotSyncCtx->mutex);
+			ereport(ERROR,
+					errmsg("promotion in progress, can not synchronize replication slots"));
+		}
+
+		/*
+		 * Advertise that we are syncing, so that the startup process knows
+		 * about this sync call during promotion.
+		 */
+		check_and_set_syncing_flag(false);
+
+		SpinLockRelease(&SlotSyncCtx->mutex);
+
 		validate_remote_info(wrconn);
 
 		synchronize_slots(wrconn);
+
+		/* We are done with sync, so reset sync flag */
+		reset_syncing_flag();
 	}
 	PG_END_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
 }
-- 
2.34.1

