From 805e5b7e35913f53b9caac996aaf0b62e937f274 Mon Sep 17 00:00:00 2001
From: John Hsu <johnhyvr@gmail.com>
Date: Tue, 4 Jun 2024 20:27:47 +0000
Subject: [PATCH] Wait on synchronous replication by default for logical
 failover slots

If synchronous replication is enabled, this patch
allows logical subscribers to wait for changes to
be replicated to synchronous replicas before
consuming the changes.

In the event that 'synchronized_standby_slots' is set, it
will take precedence.
---
 src/backend/replication/slot.c                | 271 ++++++++++--------
 src/backend/replication/syncrep.c             |  27 +-
 src/include/replication/syncrep.h             |   3 +
 .../t/040_standby_failover_slots_sync.pl      | 135 +++++++++
 4 files changed, 321 insertions(+), 115 deletions(-)

diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index baf9b89dc4..fafb6c7873 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -49,6 +49,7 @@
 #include "postmaster/interrupt.h"
 #include "replication/slotsync.h"
 #include "replication/slot.h"
+#include "replication/syncrep.h"
 #include "replication/walsender_private.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
@@ -2570,8 +2571,8 @@ SlotExistsInSyncStandbySlots(const char *slot_name)
 }
 
 /*
- * Return true if the slots specified in synchronized_standby_slots have caught up to
- * the given WAL location, false otherwise.
+ * Return true if the slots specified in synchronized_standby_slots or synchronous
+ * replication have caught up to the given WAL location, false otherwise.
  *
  * The elevel parameter specifies the error level used for logging messages
  * related to slots that do not exist, are invalidated, or are inactive.
@@ -2585,9 +2586,9 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
 
 	/*
 	 * Don't need to wait for the standbys to catch up if there is no value in
-	 * synchronized_standby_slots.
+	 * synchronized_standby_slots or synchronous replication is not configured.
 	 */
-	if (synchronized_standby_slots_config == NULL)
+	if (synchronized_standby_slots_config == NULL && !SyncRepConfigured())
 		return true;
 
 	/*
@@ -2598,144 +2599,190 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
 		return true;
 
 	/*
-	 * Don't need to wait for the standbys to catch up if they are already
-	 * beyond the specified WAL location.
-	 */
-	if (!XLogRecPtrIsInvalid(ss_oldest_flush_lsn) &&
-		ss_oldest_flush_lsn >= wait_for_lsn)
-		return true;
-
-	/*
-	 * To prevent concurrent slot dropping and creation while filtering the
-	 * slots, take the ReplicationSlotControlLock outside of the loop.
+	 * In the event that synchronized_standby_slots and synchronous replication is
+	 * configured, have the former take precedence.
 	 */
-	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-
-	name = synchronized_standby_slots_config->slot_names;
-	for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
+	if (synchronized_standby_slots_config != NULL)
 	{
-		XLogRecPtr	restart_lsn;
-		bool		invalidated;
-		bool		inactive;
-		ReplicationSlot *slot;
+		/*
+		* Don't need to wait for the standbys to catch up if they are already
+		* beyond the specified WAL location.
+		*/
+		if (!XLogRecPtrIsInvalid(ss_oldest_flush_lsn) &&
+			ss_oldest_flush_lsn >= wait_for_lsn)
+			return true;
 
-		slot = SearchNamedReplicationSlot(name, false);
+		/*
+		* To prevent concurrent slot dropping and creation while filtering the
+		* slots, take the ReplicationSlotControlLock outside of the loop.
+		*/
+		LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 
-		if (!slot)
+		name = synchronized_standby_slots_config->slot_names;
+		for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
 		{
-			/*
-			 * If a slot name provided in synchronized_standby_slots does not
-			 * exist, report a message and exit the loop. A user can specify a
-			 * slot name that does not exist just before the server startup.
-			 * The GUC check_hook(validate_sync_standby_slots) cannot validate
-			 * such a slot during startup as the ReplicationSlotCtl shared
-			 * memory is not initialized at that time. It is also possible for
-			 * a user to drop the slot in synchronized_standby_slots
-			 * afterwards.
-			 */
-			ereport(elevel,
-					errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-					errmsg("replication slot \"%s\" specified in parameter %s does not exist",
-						   name, "synchronized_standby_slots"),
-					errdetail("Logical replication is waiting on the standby associated with \"%s\".",
-							  name),
-					errhint("Consider creating the slot \"%s\" or amend parameter %s.",
-							name, "synchronized_standby_slots"));
-			break;
-		}
+			XLogRecPtr	restart_lsn;
+			bool		invalidated;
+			bool		inactive;
+			ReplicationSlot *slot;
 
-		if (SlotIsLogical(slot))
-		{
-			/*
-			 * If a logical slot name is provided in
-			 * synchronized_standby_slots, report a message and exit the loop.
-			 * Similar to the non-existent case, a user can specify a logical
-			 * slot name in synchronized_standby_slots before the server
-			 * startup, or drop an existing physical slot and recreate a
-			 * logical slot with the same name.
-			 */
-			ereport(elevel,
-					errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-					errmsg("cannot have logical replication slot \"%s\" in parameter %s",
-						   name, "synchronized_standby_slots"),
-					errdetail("Logical replication is waiting for correction on \"%s\".",
-							  name),
-					errhint("Consider removing logical slot \"%s\" from parameter %s.",
-							name, "synchronized_standby_slots"));
-			break;
-		}
+			slot = SearchNamedReplicationSlot(name, false);
 
-		SpinLockAcquire(&slot->mutex);
-		restart_lsn = slot->data.restart_lsn;
-		invalidated = slot->data.invalidated != RS_INVAL_NONE;
-		inactive = slot->active_pid == 0;
-		SpinLockRelease(&slot->mutex);
+			if (!slot)
+			{
+				/*
+				* If a slot name provided in synchronized_standby_slots does not
+				* exist, report a message and exit the loop. A user can specify a
+				* slot name that does not exist just before the server startup.
+				* The GUC check_hook(validate_sync_standby_slots) cannot validate
+				* such a slot during startup as the ReplicationSlotCtl shared
+				* memory is not initialized at that time. It is also possible for
+				* a user to drop the slot in synchronized_standby_slots
+				* afterwards.
+				*/
+				ereport(elevel,
+						errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						errmsg("replication slot \"%s\" specified in parameter %s does not exist",
+							name, "synchronized_standby_slots"),
+						errdetail("Logical replication is waiting on the standby associated with \"%s\".",
+								name),
+						errhint("Consider creating the slot \"%s\" or amend parameter %s.",
+								name, "synchronized_standby_slots"));
+				break;
+			}
 
-		if (invalidated)
-		{
-			/* Specified physical slot has been invalidated */
-			ereport(elevel,
-					errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-					errmsg("physical slot \"%s\" specified in parameter %s has been invalidated",
-						   name, "synchronized_standby_slots"),
-					errdetail("Logical replication is waiting on the standby associated with \"%s\".",
-							  name),
-					errhint("Consider dropping and recreating the slot \"%s\" or amend parameter %s.",
-							name, "synchronized_standby_slots"));
-			break;
-		}
+			if (SlotIsLogical(slot))
+			{
+				/*
+				* If a logical slot name is provided in
+				* synchronized_standby_slots, report a message and exit the loop.
+				* Similar to the non-existent case, a user can specify a logical
+				* slot name in synchronized_standby_slots before the server
+				* startup, or drop an existing physical slot and recreate a
+				* logical slot with the same name.
+				*/
+				ereport(elevel,
+						errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						errmsg("cannot have logical replication slot \"%s\" in parameter %s",
+							name, "synchronized_standby_slots"),
+						errdetail("Logical replication is waiting for correction on \"%s\".",
+								name),
+						errhint("Consider removing logical slot \"%s\" from parameter %s.",
+								name, "synchronized_standby_slots"));
+				break;
+			}
 
-		if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
-		{
-			/* Log a message if no active_pid for this physical slot */
-			if (inactive)
+			SpinLockAcquire(&slot->mutex);
+			restart_lsn = slot->data.restart_lsn;
+			invalidated = slot->data.invalidated != RS_INVAL_NONE;
+			inactive = slot->active_pid == 0;
+			SpinLockRelease(&slot->mutex);
+
+			if (invalidated)
+			{
+				/* Specified physical slot has been invalidated */
 				ereport(elevel,
 						errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-						errmsg("replication slot \"%s\" specified in parameter %s does not have active_pid",
-							   name, "synchronized_standby_slots"),
+						errmsg("physical slot \"%s\" specified in parameter %s has been invalidated",
+							name, "synchronized_standby_slots"),
 						errdetail("Logical replication is waiting on the standby associated with \"%s\".",
-								  name),
-						errhint("Consider starting standby associated with \"%s\" or amend parameter %s.",
+								name),
+						errhint("Consider dropping and recreating the slot \"%s\" or amend parameter %s.",
 								name, "synchronized_standby_slots"));
+				break;
+			}
 
-			/* Continue if the current slot hasn't caught up. */
-			break;
+			if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
+			{
+				/* Log a message if no active_pid for this physical slot */
+				if (inactive)
+					ereport(elevel,
+							errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+							errmsg("replication slot \"%s\" specified in parameter %s does not have active_pid",
+								name, "synchronized_standby_slots"),
+							errdetail("Logical replication is waiting on the standby associated with \"%s\".",
+									name),
+							errhint("Consider starting standby associated with \"%s\" or amend parameter %s.",
+									name, "synchronized_standby_slots"));
+
+				/* Continue if the current slot hasn't caught up. */
+				break;
+			}
+
+			Assert(restart_lsn >= wait_for_lsn);
+
+			if (XLogRecPtrIsInvalid(min_restart_lsn) ||
+				min_restart_lsn > restart_lsn)
+				min_restart_lsn = restart_lsn;
+
+			caught_up_slot_num++;
+
+			name += strlen(name) + 1;
 		}
 
-		Assert(restart_lsn >= wait_for_lsn);
+		LWLockRelease(ReplicationSlotControlLock);
+
+		/*
+		* Return false if not all the standbys have caught up to the specified
+		* WAL location.
+		*/
+		if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
+			return false;
 
-		if (XLogRecPtrIsInvalid(min_restart_lsn) ||
-			min_restart_lsn > restart_lsn)
-			min_restart_lsn = restart_lsn;
+		/* The ss_oldest_flush_lsn must not retreat. */
+		Assert(XLogRecPtrIsInvalid(ss_oldest_flush_lsn) ||
+			min_restart_lsn >= ss_oldest_flush_lsn);
 
-		caught_up_slot_num++;
+		ss_oldest_flush_lsn = min_restart_lsn;
 
-		name += strlen(name) + 1;
+		return true;
 	}
+	else
+	{
+		volatile WalSndCtlData *walsndctl = WalSndCtl;
+		static XLogRecPtr	lsn[NUM_SYNC_REP_WAIT_MODE];
+		static bool initialized = false;
+		/* Initialize value in case SIGHUP changing to SYNC_REP_NO_WAIT */
+		int mode = SyncRepWaitMode;
+		int i;
+
+		if (!initialized)
+		{
+			for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
+			{
+				lsn[i] = InvalidXLogRecPtr;
+			}
+		}
 
-	LWLockRelease(ReplicationSlotControlLock);
+		if (mode == SYNC_REP_NO_WAIT)
+			return true;
 
-	/*
-	 * Return false if not all the standbys have caught up to the specified
-	 * WAL location.
-	 */
-	if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
-		return false;
+		if (lsn[mode] >= wait_for_lsn)
+			return true;
+
+		LWLockAcquire(SyncRepLock, LW_SHARED);
 
-	/* The ss_oldest_flush_lsn must not retreat. */
-	Assert(XLogRecPtrIsInvalid(ss_oldest_flush_lsn) ||
-		   min_restart_lsn >= ss_oldest_flush_lsn);
+		/* Cache values to reduce contention on lock */
+		for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
+		{
+			lsn[i] = walsndctl->lsn[i];
+		}
 
-	ss_oldest_flush_lsn = min_restart_lsn;
+		LWLockRelease(SyncRepLock);
 
-	return true;
+		if (lsn[mode] >= wait_for_lsn)
+			return true;
+
+		return false;
+	}
 }
 
 /*
  * Wait for physical standbys to confirm receiving the given lsn.
  *
  * Used by logical decoding SQL functions. It waits for physical standbys
- * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
+ * corresponding to the physical slots specified in the synchronized_standby_slots GUC,
+ * or synchronous replication.
  */
 void
 WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
@@ -2745,7 +2792,7 @@ WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
 	 * slot is not a logical failover slot, or there is no value in
 	 * synchronized_standby_slots.
 	 */
-	if (!MyReplicationSlot->data.failover || !synchronized_standby_slots_config)
+	if (!MyReplicationSlot->data.failover || !(synchronized_standby_slots_config || SyncRepConfigured()))
 		return;
 
 	ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index fa5988c824..69c061d8dc 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -78,6 +78,7 @@
 #include "common/int.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/slot.h"
 #include "replication/syncrep.h"
 #include "replication/walsender.h"
 #include "replication/walsender_private.h"
@@ -95,7 +96,7 @@ char	   *SyncRepStandbyNames;
 static bool announce_next_takeover = true;
 
 SyncRepConfigData *SyncRepConfig = NULL;
-static int	SyncRepWaitMode = SYNC_REP_NO_WAIT;
+int	SyncRepWaitMode = SYNC_REP_NO_WAIT;
 
 static void SyncRepQueueInsert(int mode);
 static void SyncRepCancelWait(void);
@@ -124,6 +125,8 @@ static int	cmp_lsn(const void *a, const void *b);
 static bool SyncRepQueueIsOrderedByLSN(int mode);
 #endif
 
+bool SyncRepConfigured(void);
+
 /*
  * ===========================================================
  * Synchronous Replication functions for normal user backends
@@ -169,8 +172,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
 	 * described in SyncRepUpdateSyncStandbysDefined(). On the other hand, if
 	 * it's false, the lock is not necessary because we don't touch the queue.
 	 */
-	if (!SyncRepRequested() ||
-		!((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
+	if (!SyncRepConfigured())
 		return;
 
 	/* Cap the level for anything other than commit to remote flush only. */
@@ -523,6 +525,15 @@ SyncRepReleaseWaiters(void)
 
 	LWLockRelease(SyncRepLock);
 
+	/*
+	 * If synchronized_standby_slots is set, the respective walsender's
+	 * will be responsible for broadcasting the value.
+	 */
+	if (strcmp(synchronized_standby_slots, "") == 0)
+	{
+		ConditionVariableBroadcast(&WalSndCtl->wal_confirm_rcv_cv);
+	}
+
 	elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
 		 numwrite, LSN_FORMAT_ARGS(writePtr),
 		 numflush, LSN_FORMAT_ARGS(flushPtr),
@@ -1069,3 +1080,13 @@ assign_synchronous_commit(int newval, void *extra)
 			break;
 	}
 }
+
+bool
+SyncRepConfigured()
+{
+	if (!SyncRepRequested() ||
+		!((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
+		return false;
+
+	return true;
+}
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index ea439e6da6..ec22bc72df 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -82,6 +82,7 @@ extern PGDLLIMPORT char *SyncRepStandbyNames;
 
 /* called by user backend */
 extern void SyncRepWaitForLSN(XLogRecPtr lsn, bool commit);
+extern bool SyncRepConfigured();
 
 /* called at backend exit */
 extern void SyncRepCleanupAtProcExit(void);
@@ -96,6 +97,8 @@ extern int	SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys);
 /* called by checkpointer */
 extern void SyncRepUpdateSyncStandbysDefined(void);
 
+extern int	SyncRepWaitMode;
+
 /*
  * Internal functions for parsing synchronous_standby_names grammar,
  * in syncrep_gram.y and syncrep_scanner.l
diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl
index 2c51cfc3c8..00130d92dc 100644
--- a/src/test/recovery/t/040_standby_failover_slots_sync.pl
+++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl
@@ -710,6 +710,141 @@ $result = $subscriber1->safe_psql('postgres',
 is($result, 't',
 	"subscriber1 gets data from primary after standby1 acknowledges changes");
 
+##################################################
+# Test that logical failover replication slots wait for the specified
+# synchronous replicas to receive the changes first. It uses the
+# following set up:
+#
+# 				(synchronous physical standbys)
+#				| ----> standby1 (application_name = standby1)
+#				| ----> standby2 (application_name = standby2)
+# primary -----	|
+#				(logical replication)
+#				| ----> subscriber1 (failover = true, slot_name = lsub1_slot)
+#				| ----> subscriber2 (failover = false, slot_name = lsub2_slot)
+#
+# synchronous_commit = 'on'
+# synchronous_standby_names = 'ANY 2 (standby1, standby2)'
+#
+# The setup is configured in such a way that the logical slot of subscriber1 is
+# enabled for failover, and thus the subscriber1 will wait for the changes to have
+# been synchronously replicated before receiving the decoded changes.
+##################################################
+
+$primary->safe_psql('postgres', "TRUNCATE tab_int;");
+# Setup synchronous replication
+$primary->append_conf(
+	'postgresql.conf', qq(
+		synchronous_commit = 'on'
+		synchronous_standby_names = 'ANY 2 (standby1, standby2)'
+));
+
+$primary->reload;
+
+$standby1->append_conf('postgresql.conf', "primary_conninfo = '$connstr_1 dbname=postgres application_name=standby1'");
+$standby1->reload;
+
+$standby2->append_conf('postgresql.conf', "primary_conninfo = '$connstr_1 dbname=postgres application_name=standby2'");
+$standby2->reload;
+
+# Check that synchronous replication is setup properly
+$standby2->stop;
+
+# Create some data on the primary
+$primary_row_count = 10;
+
+my $sync_back_q = $primary->background_psql(
+	'postgres',
+	on_error_stop => 0,
+	timeout => $PostgreSQL::Test::Utils::timeout_default);
+
+$sync_back_q->query_until(qr/insert_blocked_on_sync_rep/, q(
+    \echo insert_blocked_on_sync_rep
+   INSERT INTO tab_int SELECT generate_series(1, 10);
+));
+
+$result = $primary->safe_psql('postgres',
+	"SELECT count(*) = $primary_row_count FROM tab_int;");
+
+is($result, 'f', "primary row count is not updated due to synchronous replication");
+
+# Verify the standby specified in synchronized_standby_slots (sb1_slot aka standby1)
+# catches up with the primary.
+$primary->wait_for_replay_catchup($standby1);
+$result = $standby1->safe_psql('postgres',
+	"SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't', "standby1 gets data from primary");
+
+# Validate that synchronized_standby_slots takes precedence over waiting for
+# changes to have been synchronous replicated.
+# Since the slot specified (sb1_slot) has received the changes, primary can send
+# the decoded changes to the subscription enabled for failover
+# (i.e. regress_mysub1).
+$primary->wait_for_catchup('regress_mysub1');
+$result = $subscriber1->safe_psql('postgres',
+	"SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't',
+	"subscriber1 gets data from primary after standby1 acknowledges changes");
+
+$standby2->start;
+$primary->wait_for_replay_catchup($standby2);
+
+# Unset synchronized_standby_slots to test synchronous replication
+# blocks primary from sending logical decoded changes to failover slots until
+# changes have been synchronously replicated.
+$primary->append_conf(
+	'postgresql.conf', qq(
+	synchronized_standby_slots = ''));
+$primary->reload;
+
+$primary_row_count = 20;
+$standby2->stop;
+
+$sync_back_q->query_until(
+	qr/insert_blocked_on_additional_sync_rep/, q(
+	\echo insert_blocked_on_additional_sync_rep
+	INSERT INTO tab_int SELECT generate_series(11, 20);
+));
+
+# Since $standby2 has not received the changes, validate that subscriber1 (failover = true)
+# has not received the decoded changes, but subscriber2 (failover = false) has.
+$primary->wait_for_catchup('regress_mysub2');
+
+$result = $subscriber2->safe_psql('postgres',
+	"SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't',
+	"subscriber2 gets data from primary even if the changes have not been synchronously acknowledged.");
+
+$result = $subscriber1->safe_psql('postgres',
+	"SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 'f',
+	"subscriber1 does not get data from primary since changes have not been synchronously acknowledged.");
+
+# Start standby2 to allow the changes to be acknowledged by all the synchronous standbys.
+$standby2->start;
+$primary->wait_for_replay_catchup($standby2);
+$primary->wait_for_catchup('regress_mysub1');
+
+# Now that the changes have been replicated to all synchronous nodes,
+# primary can send the decoded changes to the subscription enabled for failover
+# (i.e. regress_mysub1). While the standby was down, regress_mysub1 didn't
+# receive any data from the primary. i.e. the primary didn't allow it to go
+# ahead of requirements of synchronous commit.
+$result = $subscriber1->safe_psql('postgres',
+	"SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't',
+	"subscriber1 gets data from primary since changes have been syhcronously acknowledged.");
+# No longer need to run the background session.
+$sync_back_q->quit;
+
+# Reset synchronized_standby_slots and synchronous commit for below test cases
+$primary->append_conf(
+	'postgresql.conf', qq(
+synchronized_standby_slots = 'sb1_slot'
+synchronous_standby_names = ''
+));
+$primary->reload;
+
 ##################################################
 # Verify that when using pg_logical_slot_get_changes to consume changes from a
 # logical failover slot, it will also wait for the slots specified in
-- 
2.40.1

