From 603e01d634a30de0a0103f657010e4fce953390e Mon Sep 17 00:00:00 2001
From: John Hsu <johnhyvr@gmail.com>
Date: Mon, 26 Aug 2024 18:23:20 +0000
Subject: [PATCH] Wait on synchronous replication by default for logical
 failover slots

If synchronous replication is enabled, this patch
allows logical subscribers with failover_slots enabled
to wait for changes to be replicated to synchronous replicas
before consuming the changes.
---
 src/backend/replication/slot.c                | 266 ++++++++++--------
 src/backend/replication/syncrep.c             |  27 +-
 src/include/replication/syncrep.h             |   3 +
 src/include/storage/lwlocklist.h              |   2 +-
 .../t/040_standby_failover_slots_sync.pl      | 135 +++++++++
 5 files changed, 316 insertions(+), 117 deletions(-)

diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index c290339af5..460358d259 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -49,6 +49,7 @@
 #include "postmaster/interrupt.h"
 #include "replication/slotsync.h"
 #include "replication/slot.h"
+#include "replication/syncrep.h"
 #include "replication/walsender_private.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
@@ -2591,8 +2592,8 @@ SlotExistsInSyncStandbySlots(const char *slot_name)
 }
 
 /*
- * Return true if the slots specified in synchronized_standby_slots have caught up to
- * the given WAL location, false otherwise.
+ * Return true if the slots specified in synchronized_standby_slots or synchronous
+ * replication have caught up to the given WAL location, false otherwise.
  *
  * The elevel parameter specifies the error level used for logging messages
  * related to slots that do not exist, are invalidated, or are inactive.
@@ -2606,9 +2607,9 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
 
 	/*
 	 * Don't need to wait for the standbys to catch up if there is no value in
-	 * synchronized_standby_slots.
+	 * synchronized_standby_slots or synchronous replication is not configured.
 	 */
-	if (synchronized_standby_slots_config == NULL)
+	if (synchronized_standby_slots_config == NULL && !SyncRepConfigured())
 		return true;
 
 	/*
@@ -2619,144 +2620,183 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
 		return true;
 
 	/*
-	 * Don't need to wait for the standbys to catch up if they are already
-	 * beyond the specified WAL location.
-	 */
-	if (!XLogRecPtrIsInvalid(ss_oldest_flush_lsn) &&
-		ss_oldest_flush_lsn >= wait_for_lsn)
-		return true;
-
-	/*
-	 * To prevent concurrent slot dropping and creation while filtering the
-	 * slots, take the ReplicationSlotControlLock outside of the loop.
+	 * In the event that synchronized_standby_slots and synchronous replication is
+	 * configured, have the former take precedence.
 	 */
-	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-
-	name = synchronized_standby_slots_config->slot_names;
-	for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
+	if (synchronized_standby_slots_config != NULL)
 	{
-		XLogRecPtr	restart_lsn;
-		bool		invalidated;
-		bool		inactive;
-		ReplicationSlot *slot;
-
-		slot = SearchNamedReplicationSlot(name, false);
+		/*
+		 * Don't need to wait for the standbys to catch up if they are already
+		 * beyond the specified WAL location.
+		 */
+		if (!XLogRecPtrIsInvalid(ss_oldest_flush_lsn) &&
+			ss_oldest_flush_lsn >= wait_for_lsn)
+			return true;
 
-		if (!slot)
+		/*
+		 * To prevent concurrent slot dropping and creation while filtering the
+		 * slots, take the ReplicationSlotControlLock outside of the loop.
+		 */
+		LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+		name = synchronized_standby_slots_config->slot_names;
+		for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
 		{
-			/*
-			 * If a slot name provided in synchronized_standby_slots does not
-			 * exist, report a message and exit the loop. A user can specify a
-			 * slot name that does not exist just before the server startup.
-			 * The GUC check_hook(validate_sync_standby_slots) cannot validate
-			 * such a slot during startup as the ReplicationSlotCtl shared
-			 * memory is not initialized at that time. It is also possible for
-			 * a user to drop the slot in synchronized_standby_slots
-			 * afterwards.
-			 */
-			ereport(elevel,
-					errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-					errmsg("replication slot \"%s\" specified in parameter %s does not exist",
-						   name, "synchronized_standby_slots"),
-					errdetail("Logical replication is waiting on the standby associated with \"%s\".",
-							  name),
-					errhint("Consider creating the slot \"%s\" or amend parameter %s.",
-							name, "synchronized_standby_slots"));
-			break;
-		}
+			XLogRecPtr	restart_lsn;
+			bool		invalidated;
+			bool		inactive;
+			ReplicationSlot *slot;
 
-		if (SlotIsLogical(slot))
-		{
-			/*
-			 * If a logical slot name is provided in
-			 * synchronized_standby_slots, report a message and exit the loop.
-			 * Similar to the non-existent case, a user can specify a logical
-			 * slot name in synchronized_standby_slots before the server
-			 * startup, or drop an existing physical slot and recreate a
-			 * logical slot with the same name.
-			 */
-			ereport(elevel,
-					errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-					errmsg("cannot have logical replication slot \"%s\" in parameter %s",
-						   name, "synchronized_standby_slots"),
-					errdetail("Logical replication is waiting for correction on \"%s\".",
-							  name),
-					errhint("Consider removing logical slot \"%s\" from parameter %s.",
-							name, "synchronized_standby_slots"));
-			break;
-		}
+			slot = SearchNamedReplicationSlot(name, false);
 
-		SpinLockAcquire(&slot->mutex);
-		restart_lsn = slot->data.restart_lsn;
-		invalidated = slot->data.invalidated != RS_INVAL_NONE;
-		inactive = slot->active_pid == 0;
-		SpinLockRelease(&slot->mutex);
+			if (!slot)
+			{
+				/*
+				 * If a slot name provided in synchronized_standby_slots does not
+				 * exist, report a message and exit the loop. A user can specify a
+				 * slot name that does not exist just before the server startup.
+				 * The GUC check_hook(validate_sync_standby_slots) cannot validate
+				 * such a slot during startup as the ReplicationSlotCtl shared
+				 * memory is not initialized at that time. It is also possible for
+				 * a user to drop the slot in synchronized_standby_slots
+				 * afterwards.
+				 */
+				ereport(elevel,
+						errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						errmsg("replication slot \"%s\" specified in parameter %s does not exist",
+							name, "synchronized_standby_slots"),
+						errdetail("Logical replication is waiting on the standby associated with \"%s\".",
+								name),
+						errhint("Consider creating the slot \"%s\" or amend parameter %s.",
+								name, "synchronized_standby_slots"));
+				break;
+			}
 
-		if (invalidated)
-		{
-			/* Specified physical slot has been invalidated */
-			ereport(elevel,
-					errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-					errmsg("physical slot \"%s\" specified in parameter %s has been invalidated",
-						   name, "synchronized_standby_slots"),
-					errdetail("Logical replication is waiting on the standby associated with \"%s\".",
-							  name),
-					errhint("Consider dropping and recreating the slot \"%s\" or amend parameter %s.",
-							name, "synchronized_standby_slots"));
-			break;
-		}
+			if (SlotIsLogical(slot))
+			{
+				/*
+				 * If a logical slot name is provided in
+				 * synchronized_standby_slots, report a message and exit the loop.
+				 * Similar to the non-existent case, a user can specify a logical
+				 * slot name in synchronized_standby_slots before the server
+				 * startup, or drop an existing physical slot and recreate a
+				 * logical slot with the same name.
+				 */
+				ereport(elevel,
+						errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						errmsg("cannot have logical replication slot \"%s\" in parameter %s",
+							name, "synchronized_standby_slots"),
+						errdetail("Logical replication is waiting for correction on \"%s\".",
+								name),
+						errhint("Consider removing logical slot \"%s\" from parameter %s.",
+								name, "synchronized_standby_slots"));
+				break;
+			}
 
-		if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
-		{
-			/* Log a message if no active_pid for this physical slot */
-			if (inactive)
+			SpinLockAcquire(&slot->mutex);
+			restart_lsn = slot->data.restart_lsn;
+			invalidated = slot->data.invalidated != RS_INVAL_NONE;
+			inactive = slot->active_pid == 0;
+			SpinLockRelease(&slot->mutex);
+
+			if (invalidated)
+			{
+				/* Specified physical slot has been invalidated */
 				ereport(elevel,
 						errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-						errmsg("replication slot \"%s\" specified in parameter %s does not have active_pid",
-							   name, "synchronized_standby_slots"),
+						errmsg("physical slot \"%s\" specified in parameter %s has been invalidated",
+							name, "synchronized_standby_slots"),
 						errdetail("Logical replication is waiting on the standby associated with \"%s\".",
-								  name),
-						errhint("Consider starting standby associated with \"%s\" or amend parameter %s.",
+								name),
+						errhint("Consider dropping and recreating the slot \"%s\" or amend parameter %s.",
 								name, "synchronized_standby_slots"));
+				break;
+			}
 
-			/* Continue if the current slot hasn't caught up. */
-			break;
+			if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
+			{
+				/* Log a message if no active_pid for this physical slot */
+				if (inactive)
+					ereport(elevel,
+							errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+							errmsg("replication slot \"%s\" specified in parameter %s does not have active_pid",
+								name, "synchronized_standby_slots"),
+							errdetail("Logical replication is waiting on the standby associated with \"%s\".",
+									name),
+							errhint("Consider starting standby associated with \"%s\" or amend parameter %s.",
+									name, "synchronized_standby_slots"));
+
+				/* Continue if the current slot hasn't caught up. */
+				break;
+			}
+
+			Assert(restart_lsn >= wait_for_lsn);
+
+			if (XLogRecPtrIsInvalid(min_restart_lsn) ||
+				min_restart_lsn > restart_lsn)
+				min_restart_lsn = restart_lsn;
+
+			caught_up_slot_num++;
+
+			name += strlen(name) + 1;
 		}
 
-		Assert(restart_lsn >= wait_for_lsn);
+		LWLockRelease(ReplicationSlotControlLock);
+
+		/*
+		 * Return false if not all the standbys have caught up to the specified
+		 * WAL location.
+		 */
+		if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
+			return false;
 
-		if (XLogRecPtrIsInvalid(min_restart_lsn) ||
-			min_restart_lsn > restart_lsn)
-			min_restart_lsn = restart_lsn;
+		/* The ss_oldest_flush_lsn must not retreat. */
+		Assert(XLogRecPtrIsInvalid(ss_oldest_flush_lsn) ||
+			min_restart_lsn >= ss_oldest_flush_lsn);
 
-		caught_up_slot_num++;
+		ss_oldest_flush_lsn = min_restart_lsn;
 
-		name += strlen(name) + 1;
+		return true;
 	}
+	else
+	{
+		volatile WalSndCtlData *walsndctl = WalSndCtl;
+		static XLogRecPtr	lsn[NUM_SYNC_REP_WAIT_MODE]; /* Cache LSNs */
+		static bool initialized = false;
+		/* Initialize value in case SIGHUP changing to SYNC_REP_NO_WAIT */
+		int mode = SyncRepWaitMode;
+		int i;
+
+		if (!initialized)
+		{
+			for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
+			{
+				lsn[i] = InvalidXLogRecPtr;
+			}
+		}
 
-	LWLockRelease(ReplicationSlotControlLock);
+		if (mode == SYNC_REP_NO_WAIT)
+			return true;
 
-	/*
-	 * Return false if not all the standbys have caught up to the specified
-	 * WAL location.
-	 */
-	if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
-		return false;
+		if (lsn[mode] >= wait_for_lsn)
+			return true;
 
-	/* The ss_oldest_flush_lsn must not retreat. */
-	Assert(XLogRecPtrIsInvalid(ss_oldest_flush_lsn) ||
-		   min_restart_lsn >= ss_oldest_flush_lsn);
+		LWLockAcquire(SyncRepLock, LW_SHARED);
+		memcpy(lsn, (XLogRecPtr *) walsndctl->lsn, sizeof(lsn));
+		LWLockRelease(SyncRepLock);
 
-	ss_oldest_flush_lsn = min_restart_lsn;
+		if (lsn[mode] >= wait_for_lsn)
+			return true;
 
-	return true;
+		return false;
+	}
 }
 
 /*
  * Wait for physical standbys to confirm receiving the given lsn.
  *
  * Used by logical decoding SQL functions. It waits for physical standbys
- * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
+ * corresponding to the physical slots specified in the synchronized_standby_slots GUC,
+ * or synchronous replication.
  */
 void
 WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
@@ -2766,7 +2806,7 @@ WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
 	 * slot is not a logical failover slot, or there is no value in
 	 * synchronized_standby_slots.
 	 */
-	if (!MyReplicationSlot->data.failover || !synchronized_standby_slots_config)
+	if (!MyReplicationSlot->data.failover || !(synchronized_standby_slots_config || SyncRepConfigured()))
 		return;
 
 	ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index fa5988c824..69c061d8dc 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -78,6 +78,7 @@
 #include "common/int.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/slot.h"
 #include "replication/syncrep.h"
 #include "replication/walsender.h"
 #include "replication/walsender_private.h"
@@ -95,7 +96,7 @@ char	   *SyncRepStandbyNames;
 static bool announce_next_takeover = true;
 
 SyncRepConfigData *SyncRepConfig = NULL;
-static int	SyncRepWaitMode = SYNC_REP_NO_WAIT;
+int	SyncRepWaitMode = SYNC_REP_NO_WAIT;
 
 static void SyncRepQueueInsert(int mode);
 static void SyncRepCancelWait(void);
@@ -124,6 +125,8 @@ static int	cmp_lsn(const void *a, const void *b);
 static bool SyncRepQueueIsOrderedByLSN(int mode);
 #endif
 
+bool SyncRepConfigured(void);
+
 /*
  * ===========================================================
  * Synchronous Replication functions for normal user backends
@@ -169,8 +172,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
 	 * described in SyncRepUpdateSyncStandbysDefined(). On the other hand, if
 	 * it's false, the lock is not necessary because we don't touch the queue.
 	 */
-	if (!SyncRepRequested() ||
-		!((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
+	if (!SyncRepConfigured())
 		return;
 
 	/* Cap the level for anything other than commit to remote flush only. */
@@ -523,6 +525,15 @@ SyncRepReleaseWaiters(void)
 
 	LWLockRelease(SyncRepLock);
 
+	/*
+	 * If synchronized_standby_slots is set, the respective walsender's
+	 * will be responsible for broadcasting the value.
+	 */
+	if (strcmp(synchronized_standby_slots, "") == 0)
+	{
+		ConditionVariableBroadcast(&WalSndCtl->wal_confirm_rcv_cv);
+	}
+
 	elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
 		 numwrite, LSN_FORMAT_ARGS(writePtr),
 		 numflush, LSN_FORMAT_ARGS(flushPtr),
@@ -1069,3 +1080,13 @@ assign_synchronous_commit(int newval, void *extra)
 			break;
 	}
 }
+
+bool
+SyncRepConfigured()
+{
+	if (!SyncRepRequested() ||
+		!((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
+		return false;
+
+	return true;
+}
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index ea439e6da6..ec22bc72df 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -82,6 +82,7 @@ extern PGDLLIMPORT char *SyncRepStandbyNames;
 
 /* called by user backend */
 extern void SyncRepWaitForLSN(XLogRecPtr lsn, bool commit);
+extern bool SyncRepConfigured();
 
 /* called at backend exit */
 extern void SyncRepCleanupAtProcExit(void);
@@ -96,6 +97,8 @@ extern int	SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys);
 /* called by checkpointer */
 extern void SyncRepUpdateSyncStandbysDefined(void);
 
+extern int	SyncRepWaitMode;
+
 /*
  * Internal functions for parsing synchronous_standby_names grammar,
  * in syncrep_gram.y and syncrep_scanner.l
diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h
index 88dc79b2bd..e44bc3a07b 100644
--- a/src/include/storage/lwlocklist.h
+++ b/src/include/storage/lwlocklist.h
@@ -83,4 +83,4 @@ PG_LWLOCK(49, WALSummarizer)
 PG_LWLOCK(50, DSMRegistry)
 PG_LWLOCK(51, InjectionPoint)
 PG_LWLOCK(52, SerialControl)
-PG_LWLOCK(53, WaitLSN)
+PG_LWLOCK(53, WaitLSN)
\ No newline at end of file
diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl
index 2c51cfc3c8..00130d92dc 100644
--- a/src/test/recovery/t/040_standby_failover_slots_sync.pl
+++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl
@@ -710,6 +710,141 @@ $result = $subscriber1->safe_psql('postgres',
 is($result, 't',
 	"subscriber1 gets data from primary after standby1 acknowledges changes");
 
+##################################################
+# Test that logical failover replication slots wait for the specified
+# synchronous replicas to receive the changes first. It uses the
+# following set up:
+#
+# 				(synchronous physical standbys)
+#				| ----> standby1 (application_name = standby1)
+#				| ----> standby2 (application_name = standby2)
+# primary -----	|
+#				(logical replication)
+#				| ----> subscriber1 (failover = true, slot_name = lsub1_slot)
+#				| ----> subscriber2 (failover = false, slot_name = lsub2_slot)
+#
+# synchronous_commit = 'on'
+# synchronous_standby_names = 'ANY 2 (standby1, standby2)'
+#
+# The setup is configured in such a way that the logical slot of subscriber1 is
+# enabled for failover, and thus the subscriber1 will wait for the changes to have
+# been synchronously replicated before receiving the decoded changes.
+##################################################
+
+$primary->safe_psql('postgres', "TRUNCATE tab_int;");
+# Setup synchronous replication
+$primary->append_conf(
+	'postgresql.conf', qq(
+		synchronous_commit = 'on'
+		synchronous_standby_names = 'ANY 2 (standby1, standby2)'
+));
+
+$primary->reload;
+
+$standby1->append_conf('postgresql.conf', "primary_conninfo = '$connstr_1 dbname=postgres application_name=standby1'");
+$standby1->reload;
+
+$standby2->append_conf('postgresql.conf', "primary_conninfo = '$connstr_1 dbname=postgres application_name=standby2'");
+$standby2->reload;
+
+# Check that synchronous replication is setup properly
+$standby2->stop;
+
+# Create some data on the primary
+$primary_row_count = 10;
+
+my $sync_back_q = $primary->background_psql(
+	'postgres',
+	on_error_stop => 0,
+	timeout => $PostgreSQL::Test::Utils::timeout_default);
+
+$sync_back_q->query_until(qr/insert_blocked_on_sync_rep/, q(
+    \echo insert_blocked_on_sync_rep
+   INSERT INTO tab_int SELECT generate_series(1, 10);
+));
+
+$result = $primary->safe_psql('postgres',
+	"SELECT count(*) = $primary_row_count FROM tab_int;");
+
+is($result, 'f', "primary row count is not updated due to synchronous replication");
+
+# Verify the standby specified in synchronized_standby_slots (sb1_slot aka standby1)
+# catches up with the primary.
+$primary->wait_for_replay_catchup($standby1);
+$result = $standby1->safe_psql('postgres',
+	"SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't', "standby1 gets data from primary");
+
+# Validate that synchronized_standby_slots takes precedence over waiting for
+# changes to have been synchronous replicated.
+# Since the slot specified (sb1_slot) has received the changes, primary can send
+# the decoded changes to the subscription enabled for failover
+# (i.e. regress_mysub1).
+$primary->wait_for_catchup('regress_mysub1');
+$result = $subscriber1->safe_psql('postgres',
+	"SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't',
+	"subscriber1 gets data from primary after standby1 acknowledges changes");
+
+$standby2->start;
+$primary->wait_for_replay_catchup($standby2);
+
+# Unset synchronized_standby_slots to test synchronous replication
+# blocks primary from sending logical decoded changes to failover slots until
+# changes have been synchronously replicated.
+$primary->append_conf(
+	'postgresql.conf', qq(
+	synchronized_standby_slots = ''));
+$primary->reload;
+
+$primary_row_count = 20;
+$standby2->stop;
+
+$sync_back_q->query_until(
+	qr/insert_blocked_on_additional_sync_rep/, q(
+	\echo insert_blocked_on_additional_sync_rep
+	INSERT INTO tab_int SELECT generate_series(11, 20);
+));
+
+# Since $standby2 has not received the changes, validate that subscriber1 (failover = true)
+# has not received the decoded changes, but subscriber2 (failover = false) has.
+$primary->wait_for_catchup('regress_mysub2');
+
+$result = $subscriber2->safe_psql('postgres',
+	"SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't',
+	"subscriber2 gets data from primary even if the changes have not been synchronously acknowledged.");
+
+$result = $subscriber1->safe_psql('postgres',
+	"SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 'f',
+	"subscriber1 does not get data from primary since changes have not been synchronously acknowledged.");
+
+# Start standby2 to allow the changes to be acknowledged by all the synchronous standbys.
+$standby2->start;
+$primary->wait_for_replay_catchup($standby2);
+$primary->wait_for_catchup('regress_mysub1');
+
+# Now that the changes have been replicated to all synchronous nodes,
+# primary can send the decoded changes to the subscription enabled for failover
+# (i.e. regress_mysub1). While the standby was down, regress_mysub1 didn't
+# receive any data from the primary. i.e. the primary didn't allow it to go
+# ahead of requirements of synchronous commit.
+$result = $subscriber1->safe_psql('postgres',
+	"SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't',
+	"subscriber1 gets data from primary since changes have been syhcronously acknowledged.");
+# No longer need to run the background session.
+$sync_back_q->quit;
+
+# Reset synchronized_standby_slots and synchronous commit for below test cases
+$primary->append_conf(
+	'postgresql.conf', qq(
+synchronized_standby_slots = 'sb1_slot'
+synchronous_standby_names = ''
+));
+$primary->reload;
+
 ##################################################
 # Verify that when using pg_logical_slot_get_changes to consume changes from a
 # logical failover slot, it will also wait for the slots specified in
-- 
2.40.1

