From 3aa580b2874063d7bd6f1c5806824fd3c55b2654 Mon Sep 17 00:00:00 2001
From: Ashutosh Sharma <ashu.coek88@gmail.com>
Date: Wed, 13 May 2026 08:51:21 +0000
Subject: [PATCH 2/3] Add ANY N semantics to synchronized_standby_slots

Extend synchronized_standby_slots with quorum syntax for logical
failover slot synchronization:

- ANY N (slot1, slot2, ...)

Plain-list semantics are preserved as-is:

- slot1, slot2 continues to mean all listed slots are required

Implementation details:

- Reuse syncrep parser infrastructure in the GUC check hook and
  map parsed output into synchronized_standby_slots semantics.
- Consume SYNC_REP_DEFAULT from the preparatory parser refactor to
  distinguish plain-list syntax from explicit parser modes.
- In StandbySlotsHaveCaughtup(), enforce mode-specific behavior for:
  - existing all-listed-slots semantics (plain list)
  - quorum N-of-M behavior (ANY N)
- Validation rejects configurations where N exceeds the number of
  listed slots.

Tests and docs:

- Add recovery coverage for plain-list behavior and ANY quorum
  behavior, including lagging-slot and validation-error scenarios.
- Avoid starting standby1 twice while moving between test scenarios.
- Document ANY syntax and clarify plain-list behavior for this GUC.

Author: Satya Narlapuram <satyanarlapuram@gmail.com>
Author: Ashutosh Sharma <ashu.coek88@gmail.com>
Reviewed-by: Shveta Malik <shveta.malik@gmail.com>
Reviewed-by: Ajin Cherian <itsajin@gmail.com>
Reviewed-by: Hou, Zhijie <houzj.fnst@fujitsu.com>
Reviewed-by: Dilip Kumar <dilipbalaut@gmail.com>
Reviewed-by: Surya Poondla <suryapoondla4@gmail.com>
Reviewed-by: Japin Li <japinli@hotmail.com>
---
 doc/src/sgml/config.sgml                      |  72 ++-
 src/backend/replication/slot.c                | 454 +++++++++++++-----
 .../053_synchronized_standby_slots_quorum.pl  | 327 +++++++++++++
 3 files changed, 720 insertions(+), 133 deletions(-)
 create mode 100644 src/test/recovery/t/053_synchronized_standby_slots_quorum.pl

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 73cc0412330..f0f5c89a069 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -5190,17 +5190,69 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
       </term>
       <listitem>
        <para>
-        A comma-separated list of streaming replication standby server slot names
-        that logical WAL sender processes will wait for. Logical WAL sender processes
-        will send decoded changes to plugins only after the specified replication
-        slots confirm receiving WAL. This guarantees that logical replication
+        Specifies the streaming replication standby slots that logical WAL
+        sender processes must wait on before delivering decoded changes. This
+        parameter uses the following syntax:
+<synopsis>
+ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="parameter">slot_name</replaceable> [, ...] )
+<replaceable class="parameter">slot_name</replaceable> [, ...]
+</synopsis>
+        where <replaceable class="parameter">num_sync</replaceable> is
+        the number of physical replication slots that must confirm WAL
+        receipt before logical decoding proceeds,
+        and <replaceable class="parameter">slot_name</replaceable>
+        is the name of a physical replication slot.
+        <replaceable class="parameter">num_sync</replaceable>
+        must be an integer value greater than zero and must not exceed the
+        number of listed slots.
+        Other forms supported by
+        <xref linkend="guc-synchronous-standby-names"/>, such as priority
+        syntax, are not supported.
+       </para>
+       <para>
+        A plain comma-separated list without a keyword specifies that
+        <emphasis>all</emphasis> listed physical slots must confirm WAL
+        receipt. This differs from <xref linkend="guc-synchronous-standby-names"/>
+        where a simple list means <literal>FIRST 1</literal>. For
+        <varname>synchronized_standby_slots</varname>, requiring all slots
+        provides safer failover semantics by default.
+       </para>
+       <para>
+        The keyword <literal>ANY</literal>, coupled with
+        <replaceable class="parameter">num_sync</replaceable>, specifies
+        quorum-based semantics. Logical decoding proceeds once at least
+        <replaceable class="parameter">num_sync</replaceable> of the listed
+        slots have caught up. Missing, logical, and invalidated slots are
+        skipped when determining candidates. Lagging slots (inactive or
+        active) simply do not count toward the required number until they
+        catch up.
+        If fewer than <replaceable class="parameter">num_sync</replaceable>
+        slots have caught up at a given moment, logical decoding waits until
+        that threshold is reached.
+        i.e., there is no priority ordering.
+        For example, a setting of <literal>ANY 1 (sb1_slot, sb2_slot)</literal>
+        allows logical decoding to proceed as soon as either physical slot has
+        confirmed WAL receipt. If none of the slots are available or have
+        caught up, logical decoding waits until at least one slot meets the
+        required condition. This is useful in conjunction with
+        quorum-based synchronous replication
+        (<literal>synchronous_standby_names = 'ANY ...'</literal>), so that
+        logical decoding availability matches the commit durability guarantee.
+       </para>
+      <para>
+       <literal>ANY</literal> is case-insensitive.
+      </para>
+       <para>
+        The use of <varname>synchronized_standby_slots</varname> guarantees
+        that logical replication
         failover slots do not consume changes until those changes are received
-        and flushed to corresponding physical standbys. If a
+        and flushed to the required physical standbys. If a
         logical replication connection is meant to switch to a physical standby
         after the standby is promoted, the physical replication slot for the
         standby should be listed here. Note that logical replication will not
-        proceed if the slots specified in the
-        <varname>synchronized_standby_slots</varname> do not exist or are invalidated.
+        proceed if the required number of physical slots specified in
+        <varname>synchronized_standby_slots</varname> do not exist or are
+        invalidated.
         Additionally, the replication management functions
         <link linkend="pg-replication-slot-advance">
         <function>pg_replication_slot_advance</function></link>,
@@ -5208,9 +5260,9 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
         <function>pg_logical_slot_get_changes</function></link>, and
         <link linkend="pg-logical-slot-peek-changes">
         <function>pg_logical_slot_peek_changes</function></link>,
-        when used with logical failover slots, will block until all
-        physical slots specified in <varname>synchronized_standby_slots</varname> have
-        confirmed WAL receipt.
+        when used with logical failover slots, will block until the required
+        physical slots specified in <varname>synchronized_standby_slots</varname>
+        have confirmed WAL receipt.
        </para>
        <para>
         The standbys corresponding to the physical replication slots in
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 83fcde74718..9a460c992d9 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -50,6 +50,7 @@
 #include "replication/logicallauncher.h"
 #include "replication/slotsync.h"
 #include "replication/slot.h"
+#include "replication/syncrep.h"
 #include "replication/walsender_private.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
@@ -91,11 +92,19 @@ typedef struct ReplicationSlotOnDisk
  * Note: this must be a flat representation that can be held in a single chunk
  * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
  * synchronized_standby_slots GUC.
+ *
+ * The layout mirrors SyncRepConfigData so that the same quorum/priority
+ * semantics can be expressed.  The syncrep_method field uses the
+ * SYNC_REP_DEFAULT, SYNC_REP_PRIORITY, and SYNC_REP_QUORUM constants from
+ * syncrep.h.
  */
 typedef struct
 {
-	/* Number of slot names in the slot_names[] */
-	int			nslotnames;
+	int			config_size;	/* total size of this struct, in bytes */
+	int			num_sync;		/* number of slots that must confirm WAL
+								 * receipt before logical decoding proceeds */
+	uint8		syncrep_method;	/* SYNC_REP_* method */
+	int			nslotnames;		/* number of slot names that follow */
 
 	/*
 	 * slot_names contains 'nslotnames' consecutive null-terminated C strings.
@@ -103,6 +112,28 @@ typedef struct
 	char		slot_names[FLEXIBLE_ARRAY_MEMBER];
 } SyncStandbySlotsConfigData;
 
+/*
+ * State of a replication slot specified in synchronized_standby_slots GUC.
+ */
+typedef enum
+{
+	SS_SLOT_NOT_FOUND,		/* slot does not exist */
+	SS_SLOT_LOGICAL,		/* slot is logical, not physical */
+	SS_SLOT_INVALIDATED,	/* slot has been invalidated */
+	SS_SLOT_INACTIVE_LAGGING,	/* slot is inactive and behind wait_for_lsn */
+	SS_SLOT_ACTIVE_LAGGING,	/* slot is active and behind wait_for_lsn */
+} SyncStandbySlotsState;
+
+/*
+ * Information about a synchronized standby slot's state.
+ */
+typedef struct
+{
+	const char *slot_name;		/* name of the slot */
+	SyncStandbySlotsState state;	/* state of the slot */
+	XLogRecPtr	restart_lsn;	/* current restart_lsn (valid for SS_SLOT_ACTIVE_LAGGING) */
+} SyncStandbySlotsStateInfo;
+
 /*
  * Lookup table for slot invalidation causes.
  */
@@ -2967,94 +2998,133 @@ GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
 }
 
 /*
- * A helper function to validate slots specified in GUC synchronized_standby_slots.
+ * GUC check_hook for synchronized_standby_slots
+ *
+ * This reuses the syncrep_yyparse/syncrep_scanner infrastructure that is
+ * also used for synchronous_standby_names, and accepts these forms:
+ *
+ *   slot1, slot2                   -- wait for ALL listed slots
+ *   ANY N (slot1, slot2, ...)      -- wait for any N-of-M (quorum)
  *
- * The rawname will be parsed, and the result will be saved into *elemlist.
+ * Note: Simple list syntax is interpreted as "wait for ALL" for this GUC,
+ * unlike synchronous_standby_names where it means "FIRST 1".
+ *
+ * After parsing, we additionally validate every name as a legal replication
+ * slot name.
  */
-static bool
-validate_sync_standby_slots(char *rawname, List **elemlist)
+bool
+check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
 {
-	/* Verify syntax and parse string into a list of identifiers */
-	if (!SplitIdentifierString(rawname, ',', elemlist))
+	if (*newval != NULL && (*newval)[0] != '\0')
 	{
-		GUC_check_errdetail("List syntax is invalid.");
-		return false;
-	}
+		yyscan_t	scanner;
+		int			parse_rc;
+		SyncStandbySlotsConfigData *config;
+		const char *mname;
+
+		/* Result of parsing is returned in one of these two variables */
+		SyncRepConfigData *syncrep_parse_result = NULL;
+		char	   *syncrep_parse_error_msg = NULL;
+
+		/* Parse the synchronized standby slots configuration */
+		syncrep_scanner_init(*newval, &scanner);
+		parse_rc = syncrep_yyparse(&syncrep_parse_result,
+								   &syncrep_parse_error_msg,
+								   scanner);
+		syncrep_scanner_finish(scanner);
+
+		if (parse_rc != 0 || syncrep_parse_result == NULL)
+		{
+			GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
+			if (syncrep_parse_error_msg)
+				GUC_check_errdetail("%s", syncrep_parse_error_msg);
+			else
+				GUC_check_errdetail("\"%s\" parser failed.",
+									"synchronized_standby_slots");
+			return false;
+		}
 
-	/* Iterate the list to validate each slot name */
-	foreach_ptr(char, name, *elemlist)
-	{
-		int			err_code;
-		char	   *err_msg = NULL;
-		char	   *err_hint = NULL;
+		if (syncrep_parse_result->syncrep_method == SYNC_REP_PRIORITY)
+		{
+			GUC_check_errcode(ERRCODE_INVALID_PARAMETER_VALUE);
+			GUC_check_errmsg("priority syntax is not supported for parameter \"%s\"",
+							 "synchronized_standby_slots");
+			return false;
+		}
 
-		if (!ReplicationSlotValidateNameInternal(name, false, &err_code,
-												 &err_msg, &err_hint))
+		if (syncrep_parse_result->num_sync <= 0)
 		{
-			GUC_check_errcode(err_code);
-			GUC_check_errdetail("%s", err_msg);
-			if (err_hint != NULL)
-				GUC_check_errhint("%s", err_hint);
+			GUC_check_errmsg("number of synchronized standby slots (%d) must be greater than zero",
+							 syncrep_parse_result->num_sync);
 			return false;
 		}
-	}
 
-	return true;
-}
+		/* Reject num_sync > nmembers; it can never be satisfied */
+		if (syncrep_parse_result->num_sync > syncrep_parse_result->nmembers)
+		{
+			GUC_check_errmsg("number of synchronized standby slots (%d) must not exceed the number of listed slots (%d)",
+							 syncrep_parse_result->num_sync,
+							 syncrep_parse_result->nmembers);
+			return false;
+		}
 
-/*
- * GUC check_hook for synchronized_standby_slots
- */
-bool
-check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
-{
-	char	   *rawname;
-	char	   *ptr;
-	List	   *elemlist;
-	int			size;
-	bool		ok;
-	SyncStandbySlotsConfigData *config;
-
-	if ((*newval)[0] == '\0')
-		return true;
+		/*
+		 * For synchronized_standby_slots, a comma-separated list means all
+		 * listed slots are required. The syncrep parser preserves this shape
+		 * as SYNC_REP_DEFAULT, so map num_sync to nmembers to enforce all-mode
+		 * semantics.
+		 */
+		if (syncrep_parse_result->syncrep_method == SYNC_REP_DEFAULT)
+			syncrep_parse_result->num_sync = syncrep_parse_result->nmembers;
 
-	/* Need a modifiable copy of the GUC string */
-	rawname = pstrdup(*newval);
+		/* validate every member name as a slot name */
+		mname = syncrep_parse_result->member_names;
 
-	/* Now verify if the specified slots exist and have correct type */
-	ok = validate_sync_standby_slots(rawname, &elemlist);
+		for (int i = 0; i < syncrep_parse_result->nmembers; i++)
+		{
+			int			err_code;
+			char	   *err_msg = NULL;
+			char	   *err_hint = NULL;
 
-	if (!ok || elemlist == NIL)
-	{
-		pfree(rawname);
-		list_free(elemlist);
-		return ok;
-	}
+			if (!ReplicationSlotValidateNameInternal(mname, false, &err_code,
+													 &err_msg, &err_hint))
+			{
+				GUC_check_errcode(err_code);
+				GUC_check_errdetail("%s", err_msg);
+				if (err_hint != NULL)
+					GUC_check_errhint("%s", err_hint);
+				return false;
+			}
 
-	/* Compute the size required for the SyncStandbySlotsConfigData struct */
-	size = offsetof(SyncStandbySlotsConfigData, slot_names);
-	foreach_ptr(char, slot_name, elemlist)
-		size += strlen(slot_name) + 1;
+			mname += strlen(mname) + 1;
+		}
 
-	/* GUC extra value must be guc_malloc'd, not palloc'd */
-	config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
-	if (!config)
-		return false;
+		/*
+		 * Build SyncStandbySlotsConfigData from the parsed SyncRepConfigData.
+		 * Since the structures have identical layout, we can use the same
+		 * config_size.
+		 */
+		config = (SyncStandbySlotsConfigData *)
+			guc_malloc(LOG, syncrep_parse_result->config_size);
+		if (!config)
+			return false;
 
-	/* Transform the data into SyncStandbySlotsConfigData */
-	config->nslotnames = list_length(elemlist);
+		config->config_size = syncrep_parse_result->config_size;
+		config->num_sync = syncrep_parse_result->num_sync;
+		config->syncrep_method = syncrep_parse_result->syncrep_method;
+		config->nslotnames = syncrep_parse_result->nmembers;
 
-	ptr = config->slot_names;
-	foreach_ptr(char, slot_name, elemlist)
-	{
-		strcpy(ptr, slot_name);
-		ptr += strlen(slot_name) + 1;
-	}
+		/* Copy all slot names in one operation */
+		memcpy(config->slot_names,
+			   syncrep_parse_result->member_names,
+			   syncrep_parse_result->config_size -
+			   offsetof(SyncRepConfigData, member_names));
 
-	*extra = config;
+		*extra = config;
+	}
+	else
+		*extra = NULL;
 
-	pfree(rawname);
-	list_free(elemlist);
 	return true;
 }
 
@@ -3103,18 +3173,117 @@ SlotExistsInSyncStandbySlots(const char *slot_name)
 }
 
 /*
- * Return true if the slots specified in synchronized_standby_slots have caught up to
- * the given WAL location, false otherwise.
+ * Report problem states for synchronized standby slots that prevented the
+ * catch-up requirement from being met.
+ */
+static void
+ReportUnavailableSyncStandbySlots(SyncStandbySlotsStateInfo *slot_states,
+								  int num_slot_states, int elevel,
+								  XLogRecPtr wait_for_lsn)
+{
+	for (int i = 0; i < num_slot_states; i++)
+	{
+		const char *slot_name = slot_states[i].slot_name;
+
+		switch (slot_states[i].state)
+		{
+			case SS_SLOT_NOT_FOUND:
+				ereport(elevel,
+						errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
+							   slot_name, "synchronized_standby_slots"),
+						errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
+								  slot_name),
+						errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
+								slot_name, "synchronized_standby_slots"));
+				break;
+
+			case SS_SLOT_LOGICAL:
+				ereport(elevel,
+						errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
+							   slot_name, "synchronized_standby_slots"),
+						errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
+								  slot_name),
+						errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
+								slot_name, "synchronized_standby_slots"));
+				break;
+
+			case SS_SLOT_INVALIDATED:
+				ereport(elevel,
+						errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+						errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
+							   slot_name, "synchronized_standby_slots"),
+						errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
+								  slot_name),
+						errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
+								slot_name, "synchronized_standby_slots"));
+				break;
+
+			case SS_SLOT_INACTIVE_LAGGING:
+				ereport(elevel,
+						errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+						errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
+							   slot_name, "synchronized_standby_slots"),
+						errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
+								  slot_name),
+						errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
+								slot_name, "synchronized_standby_slots"));
+				break;
+
+			case SS_SLOT_ACTIVE_LAGGING:
+				if (!XLogRecPtrIsValid(slot_states[i].restart_lsn))
+					ereport(DEBUG1,
+							errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+							errmsg("replication slot \"%s\" specified in parameter \"%s\" has not caught up",
+								   slot_name, "synchronized_standby_slots"),
+							errdetail("The slot's restart_lsn is not yet set; required LSN is %X/%X.",
+								  LSN_FORMAT_ARGS(wait_for_lsn)));
+				else
+					ereport(DEBUG1,
+							errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+							errmsg("replication slot \"%s\" specified in parameter \"%s\" has not caught up",
+								   slot_name, "synchronized_standby_slots"),
+							errdetail("The slot's restart_lsn %X/%X is behind the required %X/%X.",
+								  LSN_FORMAT_ARGS(slot_states[i].restart_lsn),
+								  LSN_FORMAT_ARGS(wait_for_lsn)));
+				break;
+
+			default:
+				/* Should not happen */
+				Assert(false);
+				break;
+		}
+	}
+}
+
+/*
+ * Return true if the required standby slots have caught up to the given WAL
+ * location, false otherwise.
+ *
+ * The behavior depends on the synchronized_standby_slots configuration:
  *
- * The elevel parameter specifies the error level used for logging messages
- * related to slots that do not exist, are invalidated, or are inactive.
+ *   Simple list (e.g., "slot1, slot2"):
+ *     ALL slots must have caught up. Returns false otherwise.
+ *
+ *   ANY N (e.g., "ANY 2 (slot1, slot2, slot3)"):
+ *     Wait for any N eligible slots. Skips missing, invalid, logical, and
+ *     lagging slots (inactive or active) to find N slots that have caught up.
+ *
+ * The elevel parameter specifies the error level used for reporting issues
+ * related to the slots specified in synchronized_standby_slots when the
+ * catch-up requirement is not met.
  */
 bool
 StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
 {
 	const char *name;
 	int			caught_up_slot_num = 0;
+	int			required;
 	XLogRecPtr	min_restart_lsn = InvalidXLogRecPtr;
+	bool		wait_for_all;
+	SyncStandbySlotsStateInfo *slot_states;
+	int			num_slot_states = 0;
 
 	/*
 	 * Don't need to wait for the standbys to catch up if there is no value in
@@ -3138,12 +3307,42 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
 		ss_oldest_flush_lsn >= wait_for_lsn)
 		return true;
 
+	/*
+	 * Determine how many slots are required and whether we're in "wait for
+	 * ALL" mode versus "wait for N-of-M" mode.
+	 *
+	 * wait_for_all = true means we need ALL slots to be ready (simple
+	 * list syntax like "slot1, slot2"). In this mode, we stop checking
+	 * on the first slot that is missing/invalid/logical, or the first slot
+	 * that is lagging (inactive or active).
+	 *
+	 * wait_for_all = false means we select N from M candidates (ANY N syntax).
+	 * In this mode, slots already caught up are counted even if inactive, and
+	 * lagging slots are skipped until enough slots have caught up.
+	 */
+	required = synchronized_standby_slots_config->num_sync;
+	wait_for_all = (required == synchronized_standby_slots_config->nslotnames);
+
+	/*
+	 * Allocate array to track slot states. Size it to the total number of
+	 * configured slots since in the worst case all could have problem states.
+	 */
+	slot_states = palloc_array(SyncStandbySlotsStateInfo,
+						  synchronized_standby_slots_config->nslotnames);
+
 	/*
 	 * To prevent concurrent slot dropping and creation while filtering the
 	 * slots, take the ReplicationSlotControlLock outside of the loop.
 	 */
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 
+	/*
+	 * Iterate through configured slots, checking their state and tracking
+	 * how many have caught up. Problem states are recorded for deferred
+	 * reporting: missing/logical/invalidated slots, and lagging slots
+	 * (inactive or active). Messages are only emitted if the catch-up
+	 * requirement isn't met.
+	 */
 	name = synchronized_standby_slots_config->slot_names;
 	for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
 	{
@@ -3154,35 +3353,28 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
 
 		slot = SearchNamedReplicationSlot(name, false);
 
-		/*
-		 * If a slot name provided in synchronized_standby_slots does not
-		 * exist, report a message and exit the loop.
-		 */
 		if (!slot)
 		{
-			ereport(elevel,
-					errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-					errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
-						   name, "synchronized_standby_slots"),
-					errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
-							  name),
-					errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
-							name, "synchronized_standby_slots"));
-			break;
+			/* Record Slot State */
+			slot_states[num_slot_states].slot_name = name;
+			slot_states[num_slot_states].state = SS_SLOT_NOT_FOUND;
+			num_slot_states++;
+
+			if (wait_for_all)
+				break;
+			goto next_slot;
 		}
 
-		/* Same as above: if a slot is not physical, exit the loop. */
 		if (SlotIsLogical(slot))
 		{
-			ereport(elevel,
-					errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-					errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
-						   name, "synchronized_standby_slots"),
-					errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
-							  name),
-					errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
-							name, "synchronized_standby_slots"));
-			break;
+			/* Record Slot State */
+			slot_states[num_slot_states].slot_name = name;
+			slot_states[num_slot_states].state = SS_SLOT_LOGICAL;
+			num_slot_states++;
+
+			if (wait_for_all)
+				break;
+			goto next_slot;
 		}
 
 		SpinLockAcquire(&slot->mutex);
@@ -3193,33 +3385,35 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
 
 		if (invalidated)
 		{
-			/* Specified physical slot has been invalidated */
-			ereport(elevel,
-					errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-					errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
-						   name, "synchronized_standby_slots"),
-					errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
-							  name),
-					errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
-							name, "synchronized_standby_slots"));
-			break;
+			/* Record Slot State */
+			slot_states[num_slot_states].slot_name = name;
+			slot_states[num_slot_states].state = SS_SLOT_INVALIDATED;
+			num_slot_states++;
+
+			if (wait_for_all)
+				break;
+			goto next_slot;
 		}
 
 		if (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn)
 		{
-			/* Log a message if no active_pid for this physical slot */
-			if (inactive)
-				ereport(elevel,
-						errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-						errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
-							   name, "synchronized_standby_slots"),
-						errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
-								  name),
-						errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
-								name, "synchronized_standby_slots"));
-
-			/* Continue if the current slot hasn't caught up. */
-			break;
+			/*
+			 * If a slot is inactive and lagging, report it as inactive.
+			 * If it is active and lagging, report it as lagging.
+			 *
+			 * In ALL mode: must wait for it.
+			 * In ANY N (quorum) mode: skip and use another slot.
+			 */
+			slot_states[num_slot_states].slot_name = name;
+			slot_states[num_slot_states].state =
+				inactive ? SS_SLOT_INACTIVE_LAGGING : SS_SLOT_ACTIVE_LAGGING;
+			if (!inactive)
+				slot_states[num_slot_states].restart_lsn = restart_lsn;
+			num_slot_states++;
+
+			if (wait_for_all)
+				break;
+			goto next_slot;
 		}
 
 		Assert(restart_lsn >= wait_for_lsn);
@@ -3230,17 +3424,30 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
 
 		caught_up_slot_num++;
 
+		/* Stop processing if the required number of slots have caught up. */
+		if (caught_up_slot_num >= required)
+			break;
+
+next_slot:
 		name += strlen(name) + 1;
 	}
 
 	LWLockRelease(ReplicationSlotControlLock);
 
 	/*
-	 * Return false if not all the standbys have caught up to the specified
-	 * WAL location.
+	 * If the required number of slots have not caught up, report any
+	 * recorded problem states and return false.
+	 *
+	 * We only emit messages when the requirement is not met to avoid
+	 * misleading messages in quorum/priority mode where other slots may
+	 * have satisfied the condition despite some slots having issues.
 	 */
-	if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
+	if (caught_up_slot_num < required)
+	{
+		ReportUnavailableSyncStandbySlots(slot_states, num_slot_states, elevel, wait_for_lsn);
+		pfree(slot_states);
 		return false;
+	}
 
 	/* The ss_oldest_flush_lsn must not retreat. */
 	Assert(!XLogRecPtrIsValid(ss_oldest_flush_lsn) ||
@@ -3248,6 +3455,7 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
 
 	ss_oldest_flush_lsn = min_restart_lsn;
 
+	pfree(slot_states);
 	return true;
 }
 
diff --git a/src/test/recovery/t/053_synchronized_standby_slots_quorum.pl b/src/test/recovery/t/053_synchronized_standby_slots_quorum.pl
new file mode 100644
index 00000000000..1279b540b8b
--- /dev/null
+++ b/src/test/recovery/t/053_synchronized_standby_slots_quorum.pl
@@ -0,0 +1,327 @@
+
+# Copyright (c) 2024-2026, PostgreSQL Global Development Group
+
+# Test synchronized_standby_slots with different syntax modes:
+# - Plain list (ALL mode): slot1, slot2
+# - ANY N (quorum mode): ANY N (slot1, slot2, ...)
+#
+# Setup: a 3-node cluster with one primary, two physical standbys, and a
+# logical decoding client using a failover-enabled slot.
+#
+#               | ----> standby1 (primary_slot_name = sb1_slot)
+# primary ------|
+#               | ----> standby2 (primary_slot_name = sb2_slot)
+#
+#   synchronous_standby_names = 'ANY 1 (standby1, standby2)'
+#
+# Test scenarios:
+#
+# A) Plain list 'sb1_slot, sb2_slot' (ALL mode)
+#    - Works when all slots are available
+#    - Blocks immediately if ANY slot is unavailable
+#
+# B) ANY N (sb1_slot, sb2_slot, ...) (quorum mode)
+#    - Proceeds when at least N slots have caught up
+#    - Skips missing/invalid/logical slots and lagging slots (inactive or active)
+#      to find N caught-up slots
+#
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# ---------------------------------------------------------------------------
+# 1.  Create a primary with logical replication level, autovacuum off
+# ---------------------------------------------------------------------------
+my $primary = PostgreSQL::Test::Cluster->new('primary');
+$primary->init(allows_streaming => 'logical');
+$primary->append_conf(
+	'postgresql.conf', qq{
+autovacuum = off
+});
+$primary->start;
+
+# Physical replication slots for the two standbys
+$primary->safe_psql('postgres',
+	"SELECT pg_create_physical_replication_slot('sb1_slot');");
+$primary->safe_psql('postgres',
+	"SELECT pg_create_physical_replication_slot('sb2_slot');");
+
+# ---------------------------------------------------------------------------
+# 2.  Create standby1 and standby2 from a fresh backup
+# ---------------------------------------------------------------------------
+my $backup_name = 'base_backup';
+$primary->backup($backup_name);
+
+my $connstr = $primary->connstr;
+
+my $standby1 = PostgreSQL::Test::Cluster->new('standby1');
+$standby1->init_from_backup(
+	$primary, $backup_name,
+	has_streaming => 1,
+	has_restoring => 1);
+$standby1->append_conf(
+	'postgresql.conf', qq(
+hot_standby_feedback = on
+primary_slot_name = 'sb1_slot'
+primary_conninfo = '$connstr dbname=postgres'
+));
+
+my $standby2 = PostgreSQL::Test::Cluster->new('standby2');
+$standby2->init_from_backup(
+	$primary, $backup_name,
+	has_streaming => 1,
+	has_restoring => 1);
+$standby2->append_conf(
+	'postgresql.conf', qq(
+hot_standby_feedback = on
+primary_slot_name = 'sb2_slot'
+primary_conninfo = '$connstr dbname=postgres'
+));
+
+$standby1->start;
+$standby2->start;
+
+$primary->wait_for_replay_catchup($standby1);
+$primary->wait_for_replay_catchup($standby2);
+
+# ---------------------------------------------------------------------------
+# 3.  Create a logical failover slot on the primary
+# ---------------------------------------------------------------------------
+$primary->safe_psql('postgres',
+	"SELECT pg_create_logical_replication_slot('logical_failover', 'test_decoding', false, false, true);"
+);
+
+# ---------------------------------------------------------------------------
+# 4.  Configure quorum sync rep with ALL-mode synchronized_standby_slots
+# ---------------------------------------------------------------------------
+$primary->append_conf(
+	'postgresql.conf', qq{
+synchronous_standby_names = 'ANY 1 (standby1, standby2)'
+synchronized_standby_slots = 'sb1_slot, sb2_slot'
+});
+$primary->reload;
+
+$primary->wait_for_replay_catchup($standby1);
+$primary->wait_for_replay_catchup($standby2);
+
+# ---------------------------------------------------------------------------
+# 5.  Confirm that quorum sync rep is active for both standbys
+# ---------------------------------------------------------------------------
+is( $primary->safe_psql(
+		'postgres',
+		q{SELECT count(*) FROM pg_stat_replication WHERE sync_state = 'quorum';}
+	),
+	'2',
+	'both standbys are in quorum sync state');
+
+##################################################
+# PART A: Plain list (ALL mode) blocks when any slot is unavailable
+##################################################
+
+$standby1->stop;
+
+# Commit succeeds since standby2 satisfies the quorum.
+my $emit_lsn = $primary->safe_psql('postgres',
+	"SELECT pg_logical_emit_message(true, 'qtest', 'all_mode_blocks');"
+);
+like($emit_lsn, qr/^[0-9A-F]+\/[0-9A-F]+$/,
+	'synchronous commit succeeds with quorum (standby2 alive)');
+
+$primary->wait_for_replay_catchup($standby2);
+
+my $log_offset = -s $primary->logfile;
+
+my $bg = $primary->background_psql(
+	'postgres',
+	on_error_stop => 0,
+	timeout => $PostgreSQL::Test::Utils::timeout_default);
+
+$bg->query_until(
+	qr/decode_start/, q(
+   \echo decode_start
+   SELECT pg_logical_slot_peek_changes('logical_failover', NULL, NULL);
+));
+
+# Wait for the primary to log a warning about sb1_slot not being active.
+$primary->wait_for_log(
+	qr/replication slot \"sb1_slot\" specified in parameter "synchronized_standby_slots" does not have active_pid/,
+	$log_offset);
+
+pass('plain list (ALL mode): logical decoding blocked by unavailable sb1_slot');
+
+# Unblock by clearing synchronized_standby_slots.
+$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''");
+$primary->reload;
+$bg->quit;
+
+# Consume the change so the slot is clean for the next test.
+$primary->safe_psql('postgres',
+	q{SELECT pg_logical_slot_get_changes('logical_failover', NULL, NULL);});
+
+##################################################
+# PART B: ANY mode (quorum) — logical decoding proceeds with N-of-M slots
+##################################################
+
+# Switch synchronized_standby_slots to quorum mode: need only 1 of 2 slots.
+$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots',
+	"'ANY 1 (sb1_slot, sb2_slot)'");
+$primary->reload;
+
+# standby1 is still down; standby2 is up.
+
+# Emit another transactional message — commits via quorum.
+$primary->safe_psql('postgres',
+	"SELECT pg_logical_emit_message(true, 'qtest', 'quorum_mode_works');"
+);
+$primary->wait_for_replay_catchup($standby2);
+
+# In quorum mode, logical decoding should NOT block because sb2_slot has
+# caught up and 1-of-2 is sufficient.
+my $decoded = $primary->safe_psql('postgres',
+	q{SELECT count(*) FROM pg_logical_slot_get_changes('logical_failover', NULL, NULL)
+	  WHERE data LIKE '%quorum_mode_works%';});
+is($decoded, '1',
+	'ANY mode: logical decoding proceeds with only sb2_slot caught up');
+
+##################################################
+# PART C: Re-check plain list (ALL mode) works when both standbys are up
+##################################################
+
+# Bring standby1 back.
+$standby1->start;
+$primary->wait_for_replay_catchup($standby1);
+
+# Switch to plain list (ALL mode) with both slots.
+$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots',
+	"'sb1_slot, sb2_slot'");
+$primary->reload;
+
+$primary->safe_psql('postgres',
+	"SELECT pg_logical_emit_message(true, 'qtest', 'both_caught_up');"
+);
+$primary->wait_for_replay_catchup($standby1);
+$primary->wait_for_replay_catchup($standby2);
+
+my $decoded_bc = $primary->safe_psql('postgres',
+	q{SELECT count(*) FROM pg_logical_slot_get_changes('logical_failover', NULL, NULL)
+	  WHERE data LIKE '%both_caught_up%';});
+is($decoded_bc, '1',
+	'plain list: works when all standbys are up');
+
+##################################################
+# PART D: ANY 2 waits on an active lagging slot
+##################################################
+
+# Stop standby1 so sb1_slot can be controlled by a raw replication connection
+# that keeps the slot active while lagging.
+$standby1->stop;
+
+my $old_lsn = $primary->safe_psql('postgres',
+	"SELECT pg_current_wal_lsn();");
+
+$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots',
+	"'ANY 2 (sb1_slot, sb2_slot)'");
+$primary->reload;
+
+my $any2_lag_lsn = $primary->safe_psql('postgres',
+	"SELECT pg_logical_emit_message(true, 'qtest', 'any_2_lagging_blocks');"
+);
+$primary->wait_for_replay_catchup($standby2);
+
+my $repl_any2 = $primary->background_psql(
+	'postgres',
+	replication => 'database',
+	on_error_stop => 0,
+	timeout => $PostgreSQL::Test::Utils::timeout_default);
+
+$repl_any2->query_until(
+	qr/^$/,
+	"START_REPLICATION SLOT sb1_slot PHYSICAL $old_lsn;\n");
+
+$primary->poll_query_until('postgres', q{
+	SELECT active_pid IS NOT NULL
+	FROM pg_replication_slots
+	WHERE slot_name = 'sb1_slot'
+}) or die "replication connection did not activate sb1_slot";
+
+my $bg_any2 = $primary->background_psql(
+	'postgres',
+	on_error_stop => 0,
+	timeout => $PostgreSQL::Test::Utils::timeout_default);
+
+$bg_any2->query_until(
+	qr/decode_start/, q(
+   \echo decode_start
+   SELECT pg_logical_slot_peek_changes('logical_failover', NULL, NULL);
+));
+
+ok( $primary->poll_query_until(
+		'postgres', q{
+SELECT EXISTS (
+	SELECT 1
+	FROM pg_stat_activity
+	WHERE wait_event = 'WaitForStandbyConfirmation'
+	  AND query LIKE '%pg_logical_slot_peek_changes(''logical_failover''%'
+);
+}),
+	'ANY 2: decoding waits when only one slot has caught up');
+
+$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''");
+$primary->reload;
+$bg_any2->quit;
+$repl_any2->quit;
+
+# Consume the change for the next test.
+$primary->safe_psql('postgres',
+	q{SELECT pg_logical_slot_get_changes('logical_failover', NULL, NULL);});
+
+# Bring standby1 back up for the remaining tests.
+$standby1->start;
+$primary->wait_for_replay_catchup($standby1);
+
+
+##################################################
+# PART E: Verify GUC validation rejects bad values
+##################################################
+
+my ($result, $stdout, $stderr);
+
+# N exceeds number of listed slots
+($result, $stdout, $stderr) = $primary->psql('postgres',
+	"ALTER SYSTEM SET synchronized_standby_slots = 'ANY 3 (sb1_slot, sb2_slot)';");
+like($stderr, qr/ERROR/,
+	'GUC rejects ANY N when N > number of listed slots');
+
+# Missing closing parenthesis
+($result, $stdout, $stderr) = $primary->psql('postgres',
+	"ALTER SYSTEM SET synchronized_standby_slots = 'ANY 1 (sb1_slot, sb2_slot';");
+like($stderr, qr/ERROR/,
+	'GUC rejects malformed ANY syntax');
+
+# Priority syntax is not supported by synchronized_standby_slots yet
+($result, $stdout, $stderr) = $primary->psql('postgres',
+	"ALTER SYSTEM SET synchronized_standby_slots = 'FIRST 1 (sb1_slot, sb2_slot)';");
+like($stderr, qr/priority syntax is not supported/,
+	'GUC rejects FIRST syntax');
+
+# Legacy priority syntax is not supported by synchronized_standby_slots yet
+($result, $stdout, $stderr) = $primary->psql('postgres',
+	"ALTER SYSTEM SET synchronized_standby_slots = '1 (sb1_slot, sb2_slot)';");
+like($stderr, qr/priority syntax is not supported/,
+	'GUC rejects legacy priority syntax');
+
+# Invalid slot name
+($result, $stdout, $stderr) = $primary->psql('postgres',
+	"ALTER SYSTEM SET synchronized_standby_slots = 'ANY 1 (INVALID_UPPER)';");
+like($stderr, qr/ERROR/,
+	'GUC rejects invalid slot name in ANY syntax');
+
+# ---------------------------------------------------------------------------
+# Cleanup
+# ---------------------------------------------------------------------------
+$primary->safe_psql('postgres',
+	"SELECT pg_drop_replication_slot('logical_failover');");
+
+done_testing();
-- 
2.43.0

