I wrote:
> It doesn't seem to me to be that hard to implement the desired
> semantics for synchronous_standby_names with inconsistent info.
> In FIRST mode you basically just need to take the N smallest
> priorities you see in the array, but without assuming there are no
> duplicates or holes. It might be a good idea to include ties at the
> end, that is if you see 1,2,2,4 or 1,3,3,4 and you want 2 sync
> standbys, include the first three of them in the calculation until
> the inconsistency is resolved. In ANY mode I don't see that
> inconsistent priorities matter at all.
Concretely, I think we ought to do the attached, or something pretty
close to it.
I'm not really happy about breaking ties based on walsnd_index,
but I see that there are several TAP test cases that fail if we
do something else. I'm inclined to think those tests are bogus ...
but I won't argue to change them right now.
regards, tom lane
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index ffd5b31..c66c371 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -108,14 +108,17 @@ static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
XLogRecPtr *flushPtr,
XLogRecPtr *applyPtr,
- List *sync_standbys);
+ SyncRepStandbyData *sync_standbys,
+ int num_standbys);
static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
XLogRecPtr *flushPtr,
XLogRecPtr *applyPtr,
- List *sync_standbys, uint8 nth);
+ SyncRepStandbyData *sync_standbys,
+ int num_standbys,
+ uint8 nth);
static int SyncRepGetStandbyPriority(void);
-static List *SyncRepGetSyncStandbysPriority(bool *am_sync);
-static List *SyncRepGetSyncStandbysQuorum(bool *am_sync);
+static void SyncRepGetSyncStandbysPriority(SyncRepStandbyData *standbys, int n);
+static int standby_priority_comparator(const void *a, const void *b);
static int cmp_lsn(const void *a, const void *b);
#ifdef USE_ASSERT_CHECKING
@@ -406,9 +409,10 @@ SyncRepInitConfig(void)
priority = SyncRepGetStandbyPriority();
if (MyWalSnd->sync_standby_priority != priority)
{
- LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+ SpinLockAcquire(&MyWalSnd->mutex);
MyWalSnd->sync_standby_priority = priority;
- LWLockRelease(SyncRepLock);
+ SpinLockRelease(&MyWalSnd->mutex);
+
ereport(DEBUG1,
(errmsg("standby \"%s\" now has synchronous standby priority %u",
application_name, priority)));
@@ -523,8 +527,6 @@ SyncRepReleaseWaiters(void)
/*
* Calculate the synced Write, Flush and Apply positions among sync standbys.
*
- * The caller must hold SyncRepLock.
- *
* Return false if the number of sync standbys is less than
* synchronous_standby_names specifies. Otherwise return true and
* store the positions into *writePtr, *flushPtr and *applyPtr.
@@ -536,27 +538,43 @@ static bool
SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
XLogRecPtr *applyPtr, bool *am_sync)
{
- List *sync_standbys;
-
- Assert(LWLockHeldByMe(SyncRepLock));
+ SyncRepStandbyData *sync_standbys;
+ int num_standbys;
+ int i;
+ /* Initialize default results */
*writePtr = InvalidXLogRecPtr;
*flushPtr = InvalidXLogRecPtr;
*applyPtr = InvalidXLogRecPtr;
*am_sync = false;
+ /* Quick out if not even configured to be synchronous */
+ if (SyncRepConfig == NULL)
+ return false;
+
/* Get standbys that are considered as synchronous at this moment */
- sync_standbys = SyncRepGetSyncStandbys(am_sync);
+ num_standbys = SyncRepGetSyncStandbys(&sync_standbys);
+
+ /* Am I among the candidate sync standbys? */
+ for (i = 0; i < num_standbys; i++)
+ {
+ if (sync_standbys[i].is_me)
+ {
+ *am_sync = sync_standbys[i].is_sync_standby;
+ break;
+ }
+ }
/*
- * Quick exit if we are not managing a sync standby or there are not
- * enough synchronous standbys.
+ * Nothing more to do if we are not managing a sync standby or there are
+ * not enough synchronous standbys. (Note: if there are least num_sync
+ * candidates, then at least num_sync of them will be marked as
+ * is_sync_standby; we don't need to count them here.)
*/
if (!(*am_sync) ||
- SyncRepConfig == NULL ||
- list_length(sync_standbys) < SyncRepConfig->num_sync)
+ num_standbys < SyncRepConfig->num_sync)
{
- list_free(sync_standbys);
+ pfree(sync_standbys);
return false;
}
@@ -576,15 +594,16 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
{
SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
- sync_standbys);
+ sync_standbys, num_standbys);
}
else
{
SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
- sync_standbys, SyncRepConfig->num_sync);
+ sync_standbys, num_standbys,
+ SyncRepConfig->num_sync);
}
- list_free(sync_standbys);
+ pfree(sync_standbys);
return true;
}
@@ -592,27 +611,28 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
* Calculate the oldest Write, Flush and Apply positions among sync standbys.
*/
static void
-SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
- XLogRecPtr *applyPtr, List *sync_standbys)
+SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
+ XLogRecPtr *flushPtr,
+ XLogRecPtr *applyPtr,
+ SyncRepStandbyData *sync_standbys,
+ int num_standbys)
{
- ListCell *cell;
+ int i;
/*
* Scan through all sync standbys and calculate the oldest Write, Flush
- * and Apply positions.
+ * and Apply positions. We assume *writePtr et al were initialized to
+ * InvalidXLogRecPtr.
*/
- foreach(cell, sync_standbys)
+ for (i = 0; i < num_standbys; i++)
{
- WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
- XLogRecPtr write;
- XLogRecPtr flush;
- XLogRecPtr apply;
+ XLogRecPtr write = sync_standbys[i].write;
+ XLogRecPtr flush = sync_standbys[i].flush;
+ XLogRecPtr apply = sync_standbys[i].apply;
- SpinLockAcquire(&walsnd->mutex);
- write = walsnd->write;
- flush = walsnd->flush;
- apply = walsnd->apply;
- SpinLockRelease(&walsnd->mutex);
+ /* Ignore candidates that aren't considered synchronous */
+ if (!sync_standbys[i].is_sync_standby)
+ continue;
if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
*writePtr = write;
@@ -628,38 +648,43 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
* standbys.
*/
static void
-SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
- XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth)
+SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
+ XLogRecPtr *flushPtr,
+ XLogRecPtr *applyPtr,
+ SyncRepStandbyData *sync_standbys,
+ int num_standbys,
+ uint8 nth)
{
- ListCell *cell;
XLogRecPtr *write_array;
XLogRecPtr *flush_array;
XLogRecPtr *apply_array;
- int len;
- int i = 0;
+ int i;
+ int n;
- len = list_length(sync_standbys);
- write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
- flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
- apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
+ write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
+ flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
+ apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
- foreach(cell, sync_standbys)
+ n = 0;
+ for (i = 0; i < num_standbys; i++)
{
- WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
-
- SpinLockAcquire(&walsnd->mutex);
- write_array[i] = walsnd->write;
- flush_array[i] = walsnd->flush;
- apply_array[i] = walsnd->apply;
- SpinLockRelease(&walsnd->mutex);
+ /* Ignore candidates that aren't considered synchronous */
+ if (!sync_standbys[i].is_sync_standby)
+ continue;
- i++;
+ write_array[n] = sync_standbys[i].write;
+ flush_array[n] = sync_standbys[i].flush;
+ apply_array[n] = sync_standbys[i].apply;
+ n++;
}
+ /* Should have enough, or somebody messed up */
+ Assert(n >= nth);
+
/* Sort each array in descending order */
- qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn);
- qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn);
- qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn);
+ qsort(write_array, n, sizeof(XLogRecPtr), cmp_lsn);
+ qsort(flush_array, n, sizeof(XLogRecPtr), cmp_lsn);
+ qsort(apply_array, n, sizeof(XLogRecPtr), cmp_lsn);
/* Get Nth latest Write, Flush, Apply positions */
*writePtr = write_array[nth - 1];
@@ -689,67 +714,48 @@ cmp_lsn(const void *a, const void *b)
}
/*
- * Return the list of sync standbys, or NIL if no sync standby is connected.
- *
- * The caller must hold SyncRepLock.
+ * Return data about walsenders that are candidates to be sync standbys.
*
- * On return, *am_sync is set to true if this walsender is connecting to
- * sync standby. Otherwise it's set to false.
+ * *standbys is set to a palloc'd array of structs of per-walsender data,
+ * and the number of valid entries (candidate sync senders) is returned.
*/
-List *
-SyncRepGetSyncStandbys(bool *am_sync)
+int
+SyncRepGetSyncStandbys(SyncRepStandbyData **standbys)
{
- Assert(LWLockHeldByMe(SyncRepLock));
+ int i;
+ int n;
- /* Set default result */
- if (am_sync != NULL)
- *am_sync = false;
+ /* Create result array */
+ *standbys = (SyncRepStandbyData *)
+ palloc(max_wal_senders * sizeof(SyncRepStandbyData));
/* Quick exit if sync replication is not requested */
if (SyncRepConfig == NULL)
- return NIL;
-
- return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ?
- SyncRepGetSyncStandbysPriority(am_sync) :
- SyncRepGetSyncStandbysQuorum(am_sync);
-}
-
-/*
- * Return the list of all the candidates for quorum sync standbys,
- * or NIL if no such standby is connected.
- *
- * The caller must hold SyncRepLock. This function must be called only in
- * a quorum-based sync replication.
- *
- * On return, *am_sync is set to true if this walsender is connecting to
- * sync standby. Otherwise it's set to false.
- */
-static List *
-SyncRepGetSyncStandbysQuorum(bool *am_sync)
-{
- List *result = NIL;
- int i;
- volatile WalSnd *walsnd; /* Use volatile pointer to prevent code
- * rearrangement */
-
- Assert(SyncRepConfig->syncrep_method == SYNC_REP_QUORUM);
+ return false;
+ /* Collect raw data from shared memory */
+ n = 0;
for (i = 0; i < max_wal_senders; i++)
{
- XLogRecPtr flush;
- WalSndState state;
- int pid;
+ volatile WalSnd *walsnd; /* Use volatile pointer to prevent code
+ * rearrangement */
+ SyncRepStandbyData *stby;
+ WalSndState state; /* not included in SyncRepStandbyData */
walsnd = &WalSndCtl->walsnds[i];
+ stby = *standbys + n;
SpinLockAcquire(&walsnd->mutex);
- pid = walsnd->pid;
- flush = walsnd->flush;
+ stby->pid = walsnd->pid;
state = walsnd->state;
+ stby->write = walsnd->write;
+ stby->flush = walsnd->flush;
+ stby->apply = walsnd->apply;
+ stby->sync_standby_priority = walsnd->sync_standby_priority;
SpinLockRelease(&walsnd->mutex);
/* Must be active */
- if (pid == 0)
+ if (stby->pid == 0)
continue;
/* Must be streaming or stopping */
@@ -758,200 +764,79 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync)
continue;
/* Must be synchronous */
- if (walsnd->sync_standby_priority == 0)
+ if (stby->sync_standby_priority == 0)
continue;
/* Must have a valid flush position */
- if (XLogRecPtrIsInvalid(flush))
+ if (XLogRecPtrIsInvalid(stby->flush))
continue;
- /*
- * Consider this standby as a candidate for quorum sync standbys and
- * append it to the result.
- */
- result = lappend_int(result, i);
- if (am_sync != NULL && walsnd == MyWalSnd)
- *am_sync = true;
+ /* OK, it's a candidate */
+ stby->walsnd_index = i;
+ stby->is_me = (walsnd == MyWalSnd);
+ stby->is_sync_standby = true; /* might change below */
+ n++;
}
- return result;
+ /*
+ * In quorum mode, that's all we have to do. In priority mode, decide
+ * which ones are high enough priority to consider sync standbys.
+ */
+ if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
+ SyncRepGetSyncStandbysPriority(*standbys, n);
+
+ return n;
}
/*
- * Return the list of sync standbys chosen based on their priorities,
- * or NIL if no sync standby is connected.
- *
- * If there are multiple standbys with the same priority,
- * the first one found is selected preferentially.
- *
- * The caller must hold SyncRepLock. This function must be called only in
- * a priority-based sync replication.
+ * Decide which standbys to consider synchronous.
*
- * On return, *am_sync is set to true if this walsender is connecting to
- * sync standby. Otherwise it's set to false.
+ * This function must be called only in priority-based sync replication.
*/
-static List *
-SyncRepGetSyncStandbysPriority(bool *am_sync)
+static void
+SyncRepGetSyncStandbysPriority(SyncRepStandbyData *standbys, int n)
{
- List *result = NIL;
- List *pending = NIL;
- int lowest_priority;
- int next_highest_priority;
- int this_priority;
- int priority;
int i;
- bool am_in_pending = false;
- volatile WalSnd *walsnd; /* Use volatile pointer to prevent code
- * rearrangement */
Assert(SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY);
- lowest_priority = SyncRepConfig->nmembers;
- next_highest_priority = lowest_priority + 1;
+ /* Nothing to do here unless there are too many candidates. */
+ if (n <= SyncRepConfig->num_sync)
+ return;
/*
- * Find the sync standbys which have the highest priority (i.e, 1). Also
- * store all the other potential sync standbys into the pending list, in
- * order to scan it later and find other sync standbys from it quickly.
+ * Sort the candidates by priority; then the first num_sync ones are
+ * synchronous, and the rest aren't.
*/
- for (i = 0; i < max_wal_senders; i++)
- {
- XLogRecPtr flush;
- WalSndState state;
- int pid;
-
- walsnd = &WalSndCtl->walsnds[i];
-
- SpinLockAcquire(&walsnd->mutex);
- pid = walsnd->pid;
- flush = walsnd->flush;
- state = walsnd->state;
- SpinLockRelease(&walsnd->mutex);
-
- /* Must be active */
- if (pid == 0)
- continue;
+ qsort(standbys, n, sizeof(SyncRepStandbyData),
+ standby_priority_comparator);
- /* Must be streaming or stopping */
- if (state != WALSNDSTATE_STREAMING &&
- state != WALSNDSTATE_STOPPING)
- continue;
-
- /* Must be synchronous */
- this_priority = walsnd->sync_standby_priority;
- if (this_priority == 0)
- continue;
-
- /* Must have a valid flush position */
- if (XLogRecPtrIsInvalid(flush))
- continue;
-
- /*
- * If the priority is equal to 1, consider this standby as sync and
- * append it to the result. Otherwise append this standby to the
- * pending list to check if it's actually sync or not later.
- */
- if (this_priority == 1)
- {
- result = lappend_int(result, i);
- if (am_sync != NULL && walsnd == MyWalSnd)
- *am_sync = true;
- if (list_length(result) == SyncRepConfig->num_sync)
- {
- list_free(pending);
- return result; /* Exit if got enough sync standbys */
- }
- }
- else
- {
- pending = lappend_int(pending, i);
- if (am_sync != NULL && walsnd == MyWalSnd)
- am_in_pending = true;
-
- /*
- * Track the highest priority among the standbys in the pending
- * list, in order to use it as the starting priority for later
- * scan of the list. This is useful to find quickly the sync
- * standbys from the pending list later because we can skip
- * unnecessary scans for the unused priorities.
- */
- if (this_priority < next_highest_priority)
- next_highest_priority = this_priority;
- }
- }
+ for (i = SyncRepConfig->num_sync; i < n; i++)
+ standbys[i].is_sync_standby = false;
+}
- /*
- * Consider all pending standbys as sync if the number of them plus
- * already-found sync ones is lower than the configuration requests.
- */
- if (list_length(result) + list_length(pending) <= SyncRepConfig->num_sync)
- {
- /*
- * Set *am_sync to true if this walsender is in the pending list
- * because all pending standbys are considered as sync.
- */
- if (am_sync != NULL && !(*am_sync))
- *am_sync = am_in_pending;
+/*
+ * qsort comparator to sort SyncRepStandbyData entries by priority
+ */
+static int
+standby_priority_comparator(const void *a, const void *b)
+{
+ const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a;
+ const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b;
- result = list_concat(result, pending);
- list_free(pending);
- return result;
- }
+ /* First, sort by increasing priority value */
+ if (sa->sync_standby_priority != sb->sync_standby_priority)
+ return sa->sync_standby_priority - sb->sync_standby_priority;
/*
- * Find the sync standbys from the pending list.
+ * We might have equal priority values; arbitrarily break ties by position
+ * in the WALSnd array. (This is utterly bogus, since that is arrival
+ * order dependent, but there are regression tests that rely on it.)
*/
- priority = next_highest_priority;
- while (priority <= lowest_priority)
- {
- ListCell *cell;
-
- next_highest_priority = lowest_priority + 1;
-
- foreach(cell, pending)
- {
- i = lfirst_int(cell);
- walsnd = &WalSndCtl->walsnds[i];
-
- this_priority = walsnd->sync_standby_priority;
- if (this_priority == priority)
- {
- result = lappend_int(result, i);
- if (am_sync != NULL && walsnd == MyWalSnd)
- *am_sync = true;
-
- /*
- * We should always exit here after the scan of pending list
- * starts because we know that the list has enough elements to
- * reach SyncRepConfig->num_sync.
- */
- if (list_length(result) == SyncRepConfig->num_sync)
- {
- list_free(pending);
- return result; /* Exit if got enough sync standbys */
- }
-
- /*
- * Remove the entry for this sync standby from the list to
- * prevent us from looking at the same entry again.
- */
- pending = foreach_delete_current(pending, cell);
-
- continue; /* don't adjust next_highest_priority */
- }
-
- if (this_priority < next_highest_priority)
- next_highest_priority = this_priority;
- }
-
- priority = next_highest_priority;
- }
-
- /* never reached, but keep compiler quiet */
- Assert(false);
- return result;
+ return sa->walsnd_index - sb->walsnd_index;
}
+
/*
* Check if we are in the list of sync standbys, and if so, determine
* priority sequence. Return priority if set, or zero to indicate that
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index fc475d1..859ca60 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2375,14 +2375,16 @@ InitWalSenderSlot(void)
* Found a free slot. Reserve it for us.
*/
walsnd->pid = MyProcPid;
+ walsnd->state = WALSNDSTATE_STARTUP;
walsnd->sentPtr = InvalidXLogRecPtr;
+ walsnd->needreload = false;
walsnd->write = InvalidXLogRecPtr;
walsnd->flush = InvalidXLogRecPtr;
walsnd->apply = InvalidXLogRecPtr;
walsnd->writeLag = -1;
walsnd->flushLag = -1;
walsnd->applyLag = -1;
- walsnd->state = WALSNDSTATE_STARTUP;
+ walsnd->sync_standby_priority = 0;
walsnd->latch = &MyProc->procLatch;
walsnd->replyTime = 0;
walsnd->spillTxns = 0;
@@ -3235,7 +3237,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
Tuplestorestate *tupstore;
MemoryContext per_query_ctx;
MemoryContext oldcontext;
- List *sync_standbys;
+ SyncRepStandbyData *sync_standbys;
+ int num_standbys;
int i;
/* check to see if caller supports us returning a tuplestore */
@@ -3263,11 +3266,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
MemoryContextSwitchTo(oldcontext);
/*
- * Get the currently active synchronous standbys.
+ * Get the currently active synchronous standbys. This could be out of
+ * date before we're done, but we'll use the data anyway.
*/
- LWLockAcquire(SyncRepLock, LW_SHARED);
- sync_standbys = SyncRepGetSyncStandbys(NULL);
- LWLockRelease(SyncRepLock);
+ num_standbys = SyncRepGetSyncStandbys(&sync_standbys);
for (i = 0; i < max_wal_senders; i++)
{
@@ -3286,9 +3288,12 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
int64 spillTxns;
int64 spillCount;
int64 spillBytes;
+ bool is_sync_standby;
Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
+ int j;
+ /* Collect data from shared memory */
SpinLockAcquire(&walsnd->mutex);
if (walsnd->pid == 0)
{
@@ -3311,6 +3316,17 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
spillBytes = walsnd->spillBytes;
SpinLockRelease(&walsnd->mutex);
+ /* Detect whether walsender is/was considered synchronous */
+ is_sync_standby = false;
+ for (j = 0; j < num_standbys; j++)
+ {
+ if (sync_standbys[j].walsnd_index == i)
+ {
+ is_sync_standby = sync_standbys[j].is_sync_standby;
+ break;
+ }
+ }
+
memset(nulls, 0, sizeof(nulls));
values[0] = Int32GetDatum(pid);
@@ -3380,7 +3396,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
*/
if (priority == 0)
values[10] = CStringGetTextDatum("async");
- else if (list_member_int(sync_standbys, i))
+ else if (is_sync_standby)
values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ?
CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
else
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index c5f0e91..1308ada 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -37,6 +37,26 @@
#define SYNC_REP_QUORUM 1
/*
+ * SyncRepGetSyncStandbys returns an array of these structs,
+ * one per candidate synchronous walsender.
+ */
+typedef struct SyncRepStandbyData
+{
+ /* Copies of relevant fields from WalSnd shared-memory struct */
+ pid_t pid;
+ XLogRecPtr write;
+ XLogRecPtr flush;
+ XLogRecPtr apply;
+ int sync_standby_priority;
+ /* Index of this walsender in the WalSnd shared-memory array */
+ int walsnd_index;
+ /* This flag indicates whether this struct is about our own process */
+ bool is_me;
+ /* After-the-fact conclusion about whether this is a sync standby */
+ bool is_sync_standby;
+} SyncRepStandbyData;
+
+/*
* Struct for the configuration of synchronous replication.
*
* Note: this must be a flat representation that can be held in a single
@@ -74,7 +94,7 @@ extern void SyncRepInitConfig(void);
extern void SyncRepReleaseWaiters(void);
/* called by wal sender and user backend */
-extern List *SyncRepGetSyncStandbys(bool *am_sync);
+extern int SyncRepGetSyncStandbys(SyncRepStandbyData **standbys);
/* called by checkpointer */
extern void SyncRepUpdateSyncStandbysDefined(void);
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 366828f..734acec 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -31,8 +31,7 @@ typedef enum WalSndState
/*
* Each walsender has a WalSnd struct in shared memory.
*
- * This struct is protected by 'mutex', with two exceptions: one is
- * sync_standby_priority as noted below. The other exception is that some
+ * This struct is protected by its 'mutex' spinlock field, except that some
* members are only written by the walsender process itself, and thus that
* process is free to read those members without holding spinlock. pid and
* needreload always require the spinlock to be held for all accesses.
@@ -60,6 +59,12 @@ typedef struct WalSnd
TimeOffset flushLag;
TimeOffset applyLag;
+ /*
+ * The priority order of the standby managed by this WALSender, as listed
+ * in synchronous_standby_names, or 0 if not-listed.
+ */
+ int sync_standby_priority;
+
/* Protects shared variables shown above. */
slock_t mutex;
@@ -70,13 +75,6 @@ typedef struct WalSnd
Latch *latch;
/*
- * The priority order of the standby managed by this WALSender, as listed
- * in synchronous_standby_names, or 0 if not-listed. Protected by
- * SyncRepLock.
- */
- int sync_standby_priority;
-
- /*
* Timestamp of the last message received from standby.
*/
TimestampTz replyTime;