On Thu, Dec 11, 2014 at 10:07 PM, Heikki Linnakangas < hlinnakan...@vmware.com> wrote:
> * I don't like filling the priorities-array in > SyncRepGetSynchronousStandby(). It might be convenient for the one caller > that needs it, but it seems pretty ad hoc. > > In fact, I don't really see the point of having the array at all. You > could just print the priority in the main loop in > pg_stat_get_wal_senders(). Yeah, nominally you need to hold SyncRepLock > while reading the priority, but it seems over-zealous to be so strict about > that in pg_stat_wal_senders(), since it's just an informational view, and > priority changes so rarely that it's highly unlikely to hit a race > condition there. Also note that when you change the list of synchronous > standbys in the config file, and SIGHUP, the walsender processes will > update their priorities in random order. So the idea that you get a > consistent snapshot of the priorities is a bit bogus anyway. Also note that > between filling the priorities array and the main loop in > pg_stat_get_wal_senders(), a new WAL sender might connect and set a slot's > pid. With the current coding, you'll print an uninitialized value from the > array if that happens. So the only thing that holding the SyncRepLock > really protects from is a "torn" read of the value, if reading an int is > not atomic. We could use the spinlock to protect from that if we care. > That's fair, and it simplifies the whole process as well as the API able to fetch the synchronous WAL sender. > * Would be nicer to return a pointer, than index into the wal senders > array. That's what the caller really wants. > > I propose the attached (I admit I haven't tested it). > Actually if you do it this way I think that it would be worth adding the small optimization Fujii-san mentioned upthread: if priority is equal to 1, we leave the loop earlier and return immediately the pointer. All those things gathered give the patch attached, that I actually tested FWIW with multiple standbys and multiple entries in s_s_names. Regards, -- Michael
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index aa54bfb..34530a0 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -5,7 +5,7 @@ * Synchronous replication is new as of PostgreSQL 9.1. * * If requested, transaction commits wait until their commit LSN is - * acknowledged by the sync standby. + * acknowledged by the synchronous standby. * * This module contains the code for waiting and release of backends. * All code in this module executes on the primary. The core streaming @@ -358,6 +358,62 @@ SyncRepInitConfig(void) } /* + * Find the WAL sender servicing the synchronous standby with the lowest + * priority value, or NULL if no synchronous standby is connected. If there + * are multiple nodes with the same lowest priority value, the first node + * found is selected. The caller must hold SyncRepLock. + */ +WalSnd * +SyncRepGetSynchronousStandby(void) +{ + WalSnd *result = NULL; + int result_priority = 0; + int i; + + /* Scan WAL senders and find synchronous node if any */ + for (i = 0; i < max_wal_senders; i++) + { + /* Use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; + int this_priority; + + /* Must be active */ + if (walsnd->pid == 0) + continue; + + /* Must be streaming */ + if (walsnd->state != WALSNDSTATE_STREAMING) + continue; + + /* Must be synchronous */ + this_priority = walsnd->sync_standby_priority; + if (this_priority == 0) + continue; + + /* Must have a lower priority value than any previous ones */ + if (result != NULL && result_priority <= this_priority) + continue; + + /* Must have a valid flush position */ + if (XLogRecPtrIsInvalid(walsnd->flush)) + continue; + + result = (WalSnd *) walsnd; + result_priority = this_priority; + + /* + * If priority is equal to 1, we are sure that there are no other + * WAL senders that could be synchronous with a lower prioroty, + * hence leave immediately with this one. + */ + if (this_priority == 1) + return result; + } + + return result; +} + +/* * Update the LSNs on each queue based upon our latest state. This * implements a simple policy of first-valid-standby-releases-waiter. * @@ -368,11 +424,9 @@ void SyncRepReleaseWaiters(void) { volatile WalSndCtlData *walsndctl = WalSndCtl; - volatile WalSnd *syncWalSnd = NULL; + WalSnd *syncWalSnd; int numwrite = 0; int numflush = 0; - int priority = 0; - int i; /* * If this WALSender is serving a standby that is not on the list of @@ -388,32 +442,13 @@ SyncRepReleaseWaiters(void) /* * We're a potential sync standby. Release waiters if we are the highest * priority standby. If there are multiple standbys with same priorities - * then we use the first mentioned standby. If you change this, also - * change pg_stat_get_wal_senders(). + * then we use the first mentioned standbys. */ LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + syncWalSnd = SyncRepGetSynchronousStandby(); - for (i = 0; i < max_wal_senders; i++) - { - /* use volatile pointer to prevent code rearrangement */ - volatile WalSnd *walsnd = &walsndctl->walsnds[i]; - - if (walsnd->pid != 0 && - walsnd->state == WALSNDSTATE_STREAMING && - walsnd->sync_standby_priority > 0 && - (priority == 0 || - priority > walsnd->sync_standby_priority) && - !XLogRecPtrIsInvalid(walsnd->flush)) - { - priority = walsnd->sync_standby_priority; - syncWalSnd = walsnd; - } - } - - /* - * We should have found ourselves at least. - */ - Assert(syncWalSnd); + /* We should have found ourselves at least */ + Assert(syncWalSnd != NULL); /* * If we aren't managing the highest priority standby then just leave. diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index addae8f..742c455 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2741,9 +2741,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) Tuplestorestate *tupstore; MemoryContext per_query_ctx; MemoryContext oldcontext; - int *sync_priority; - int priority = 0; - int sync_standby = -1; + WalSnd *sync_standby; int i; /* check to see if caller supports us returning a tuplestore */ @@ -2772,38 +2770,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) MemoryContextSwitchTo(oldcontext); /* - * Get the priorities of sync standbys all in one go, to minimise lock - * acquisitions and to allow us to evaluate who is the current sync - * standby. This code must match the code in SyncRepReleaseWaiters(). + * Get the currently active synchronous standby. */ - sync_priority = palloc(sizeof(int) * max_wal_senders); LWLockAcquire(SyncRepLock, LW_SHARED); - for (i = 0; i < max_wal_senders; i++) - { - /* use volatile pointer to prevent code rearrangement */ - volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; - - if (walsnd->pid != 0) - { - /* - * Treat a standby such as a pg_basebackup background process - * which always returns an invalid flush location, as an - * asynchronous standby. - */ - sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ? - 0 : walsnd->sync_standby_priority; - - if (walsnd->state == WALSNDSTATE_STREAMING && - walsnd->sync_standby_priority > 0 && - (priority == 0 || - priority > walsnd->sync_standby_priority) && - !XLogRecPtrIsInvalid(walsnd->flush)) - { - priority = walsnd->sync_standby_priority; - sync_standby = i; - } - } - } + sync_standby = SyncRepGetSynchronousStandby(); LWLockRelease(SyncRepLock); for (i = 0; i < max_wal_senders; i++) @@ -2814,6 +2784,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) XLogRecPtr write; XLogRecPtr flush; XLogRecPtr apply; + int priority; WalSndState state; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; @@ -2827,6 +2798,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) write = walsnd->write; flush = walsnd->flush; apply = walsnd->apply; + priority = walsnd->sync_standby_priority; SpinLockRelease(&walsnd->mutex); memset(nulls, 0, sizeof(nulls)); @@ -2857,15 +2829,15 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) nulls[5] = true; values[5] = LSNGetDatum(apply); - values[6] = Int32GetDatum(sync_priority[i]); + values[6] = Int32GetDatum(priority); /* * More easily understood version of standby state. This is purely * informational, not different from priority. */ - if (sync_priority[i] == 0) + if (priority == 0) values[7] = CStringGetTextDatum("async"); - else if (i == sync_standby) + else if (walsnd == sync_standby) values[7] = CStringGetTextDatum("sync"); else values[7] = CStringGetTextDatum("potential"); @@ -2873,7 +2845,6 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) tuplestore_putvalues(tupstore, tupdesc, values, nulls); } - pfree(sync_priority); /* clean up and return the tuplestore */ tuplestore_donestoring(tupstore); diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index 7eeaf3b..6f78fee 100644 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -50,6 +50,10 @@ extern void SyncRepUpdateSyncStandbysDefined(void); /* called by various procs */ extern int SyncRepWakeQueue(bool all, int mode); +/* forward declaration to avoid pulling in walsender_private.h */ +struct WalSnd; +extern struct WalSnd *SyncRepGetSynchronousStandby(void); + extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source); extern void assign_synchronous_commit(int newval, void *extra);
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers