On Mon, Feb 1, 2016 at 11:28 PM, Fujii Masao <masao.fu...@gmail.com> wrote: > On Mon, Feb 1, 2016 at 5:36 PM, Masahiko Sawada <sawada.m...@gmail.com> wrote: >> On Sun, Jan 31, 2016 at 8:58 PM, Michael Paquier >> <michael.paqu...@gmail.com> wrote: >>> On Sun, Jan 31, 2016 at 5:28 PM, Masahiko Sawada <sawada.m...@gmail.com> >>> wrote: >>>> On Sun, Jan 31, 2016 at 5:18 PM, Michael Paquier >>>> <michael.paqu...@gmail.com> wrote: >>>>> On Sun, Jan 31, 2016 at 5:08 PM, Masahiko Sawada <sawada.m...@gmail.com> >>>>> wrote: >>>>>> On Sun, Jan 31, 2016 at 1:17 PM, Michael Paquier >>>>>> <michael.paqu...@gmail.com> wrote: >>>>>>> On Thu, Jan 28, 2016 at 10:10 PM, Masahiko Sawada wrote: >>>>>>>> By the discussions so far, I'm planning to have several replication >>>>>>>> methods such as 'quorum', 'complex' in the feature, and the each >>>>>>>> replication method specifies the syntax of s_s_names. >>>>>>>> It means that s_s_names could have the number of sync standbys like >>>>>>>> what current patch does. >>>>>>> >>>>>>> What if the application_name of a standby node has the format of an >>>>>>> integer? >>>>>> >>>>>> Even if the standby has an integer as application_name, we can set >>>>>> s_s_names like '2,1,2,3'. >>>>>> The leading '2' is always handled as the number of sync standbys when >>>>>> s_r_method = 'priority'. >>>>> >>>>> Hm. I agree with Fujii-san here, having the number of sync standbys >>>>> defined in a parameter that should have a list of names is a bit >>>>> confusing. I'd rather have a separate GUC, which brings us back to one >>>>> of the first patches that I came up with, and a couple of people, >>>>> including Josh were not happy with that because this did not support >>>>> real quorum. Perhaps the final answer would be really to get a set of >>>>> hooks, and a contrib module making use of that. >>>> >>>> Yeah, I agree with having set of hooks, and postgres core has simple >>>> multi sync replication mechanism like you suggested at first version. >>> >>> If there are hooks, I don't think that we should really bother about >>> having in core anything more complicated than what we have now. The >>> trick will be to come up with a hook design modular enough to support >>> the kind of configurations mentioned on this thread. Roughly perhaps a >>> refactoring of the syncrep code so as it is possible to wait for >>> multiple targets some of them being optional,, one modular way in >>> pg_stat_get_wal_senders to represent the status of a node to user, and >>> another hook to return to decide which are the nodes to wait for. Some >>> of the nodes being waited for may be based on conditions for quorum >>> support. That's a hard problem to do that in a flexible enough way. >> >> Hm, I think not-nested quorum and priority are not complicated, and we >> should support at least both or either simple method in core of >> postgres. >> More complicated method like using json-style, or dedicated language >> would be supported by external module. > > So what about the following plan? > > [first version] > Add only synchronous_standby_num which specifies the number of standbys > that the master must wait for before marking sync replication as completed. > This version supports simple use cases like "I want to have two synchronous > standbys". > > [second version] > Add synchronous_replication_method: 'prioriry' and 'quorum'. This version > additionally supports simple quorum commit case like "I want to ensure > that WAL is replicated synchronously to at least two standbys from five > ones listed in s_s_names". > > Or > > Add something like quorum_replication_num and quorum_standby_names, i.e., > the master must wait for at least q_r_num standbys from ones listed in > q_s_names before marking sync replication as completed. Also the master > must wait for sync replication according to s_s_num and s_s_num. > That is, this approach separates 'priority' and 'quorum' to each parameters. > This increases the number of GUC parameters, but ISTM less confusing, and > it supports a bit complicated case like "there is one local standby and three > remote standbys, then I want to ensure that WAL is replicated synchronously > to the local standby and at least two remote one", e.g., > > s_s_num = 1, s_s_names = 'local' > q_s_num = 2, q_s_names = 'remote1, remote2, remote3' > > [third version] > Add the hooks for more complicated sync replication cases. > > I'm thinking that the realistic target for 9.6 might be the first one. >
Thank you for suggestion. I agree with first version, and attached the updated patch which are modified so that it supports simple multiple sync replication you suggested. (but test cases are not included yet.) Regards, -- Masahiko Sawada
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 7f85b88..9a2f7e7 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -29,10 +29,10 @@ * single ordered queue of waiting backends, so that we can avoid * searching the through all waiters each time we receive a reply. * - * In 9.1 we support only a single synchronous standby, chosen from a - * priority list of synchronous_standby_names. Before it can become the - * synchronous standby it must have caught up with the primary; that may - * take some time. Once caught up, the current highest priority standby + * In 9.6 we support multiple synchronous standbys, chosen from a priority + * list of synchronous_standby_names. Before any standby can become a + * synchronous standbys it must have caught up with the primary; that may + * take some time. Once caught up, the current highest priority standbys * will release waiters from the queue. * * Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group @@ -59,9 +59,12 @@ /* User-settable parameters for sync rep */ char *SyncRepStandbyNames; +int synchronous_standby_num; #define SyncStandbysDefined() \ - (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0') + (SyncRepStandbyNames != NULL && \ + SyncRepStandbyNames[0] != '\0' && \ + synchronous_standby_num > 0) static bool announce_next_takeover = true; @@ -349,57 +352,168 @@ SyncRepInitConfig(void) } /* - * Find the WAL sender servicing the synchronous standby with the lowest - * priority value, or NULL if no synchronous standby is connected. If there - * are multiple standbys with the same lowest priority value, the first one - * found is selected. The caller must hold SyncRepLock. + * Is this wal sender managing a standby that is streaming and + * listed as a synchronous standby? */ -WalSnd * -SyncRepGetSynchronousStandby(void) +bool +SyncRepActiveListedWalSender(int num) { - WalSnd *result = NULL; - int result_priority = 0; - int i; + volatile WalSnd *walsnd = &WalSndCtl->walsnds[num]; + + /* Must be active */ + if (walsnd->pid == 0) + return false; + + /* Must be streaming */ + if (walsnd->state != WALSNDSTATE_STREAMING) + return false; + + /* Must be synchronous */ + if (walsnd->sync_standby_priority == 0) + return false; + + /* Must have a valid flush position */ + if (XLogRecPtrIsInvalid(walsnd->flush)) + return false; + + return true; +} + +/* + * Get both LSNs: write and flush, and confirm whether we have advanced + * to LSN or not. + */ +bool +SyncRepSyncedLsnAdvancedTo(XLogRecPtr *write_pos, XLogRecPtr *flush_pos) +{ + XLogRecPtr tmp_write_pos; + XLogRecPtr tmp_flush_pos; + bool ret = false; + + ret = SyncRepGetSyncLsns(&tmp_write_pos, &tmp_flush_pos); + + /* Have we advanced LSN? */ + if (ret) + { + if (MyWalSnd->write >= tmp_write_pos) + *write_pos = tmp_write_pos; + if (MyWalSnd->flush >= tmp_flush_pos) + *flush_pos = tmp_flush_pos; + + return true; + } + + return false; +} + +/* + * Populate a caller-supplied buffer with the walsnds indexes of the + * highest priority active synchronous standbys, up to the a limit of + * 'synchronous_standby_num'. The order of the results is undefined. + * Return the number of results actually written. + */ +int +SyncRepGetSyncStandbys(int *sync_standbys) +{ + int priority = 0; + int num_sync = 0; + int i; for (i = 0; i < max_wal_senders; i++) { /* Use volatile pointer to prevent code rearrangement */ volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; - int this_priority; + int j; - /* Must be active */ - if (walsnd->pid == 0) + /* Is this wal sender considerable one? */ + if (!SyncRepActiveListedWalSender(i)) continue; - /* Must be streaming */ - if (walsnd->state != WALSNDSTATE_STREAMING) - continue; + if (num_sync == synchronous_standby_num) + { + if (walsnd->sync_standby_priority > priority) + continue; + + for (j = 0; j < num_sync; j++) + { + volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_standbys[j]]; + + /* Found lowest priority standby, so replace it */ + if (walsndloc->sync_standby_priority == priority && + walsnd->sync_standby_priority < priority) + sync_standbys[j] = i; + + /* Update highest priority standby */ + if (priority < walsndloc->sync_standby_priority) + priority = walsndloc->sync_standby_priority; + } + } + else + { + sync_standbys[num_sync] = i; + num_sync++; - /* Must be synchronous */ - this_priority = walsnd->sync_standby_priority; - if (this_priority == 0) - continue; + /* Keep track highest priority standby */ + if (priority < walsnd->sync_standby_priority) + priority = walsnd->sync_standby_priority; + } + } - /* Must have a lower priority value than any previous ones */ - if (result != NULL && result_priority <= this_priority) - continue; + return num_sync; +} - /* Must have a valid flush position */ - if (XLogRecPtrIsInvalid(walsnd->flush)) +/* + * Obtain currently synced LSN location: write and flush. + */ +bool +SyncRepGetSyncLsns(XLogRecPtr *write_pos, XLogRecPtr *flush_pos) +{ + int sync_standbys[SYNC_REP_MAX_SYNC_STANDBY_NUM]; + int num_sync; + int i; + XLogRecPtr synced_write = InvalidXLogRecPtr; + XLogRecPtr synced_flush = InvalidXLogRecPtr; + + num_sync = SyncRepGetSyncStandbys(sync_standbys); + + for (i = 0; i < num_sync; i++) + { + elog(WARNING, "sync_standbys[%d] = %d", i, sync_standbys[i]); + } + elog(WARNING, "num_sync = %d, s_s_num = %d", num_sync, synchronous_standby_num); + + /* Just return, if sync standby is not enough */ + if (num_sync < synchronous_standby_num) + return false; + + for (i = 0; i < num_sync; i++) + { + volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_standbys[i]]; + + SpinLockAcquire(&walsndloc->mutex); + + /* Store first candidate */ + if (XLogRecPtrIsInvalid(synced_write) && XLogRecPtrIsInvalid(synced_flush)) + { + synced_write = walsndloc->write; + synced_flush = walsndloc->flush; + SpinLockRelease(&walsndloc->mutex); continue; + } - result = (WalSnd *) walsnd; - result_priority = this_priority; + /* Keep/Collect the earliest write and flush LSNs among prioritized standbys */ + if (synced_write > walsndloc->write) + synced_write = walsndloc->write; + if (synced_flush > walsndloc->flush) + synced_flush = walsndloc->flush; - /* - * If priority is equal to 1, there cannot be any other WAL senders - * with a lower priority, so we're done. - */ - if (this_priority == 1) - return result; + SpinLockRelease(&walsndloc->mutex); } - return result; + *write_pos = synced_write; + *flush_pos = synced_flush; + + return true; } /* @@ -413,9 +527,9 @@ void SyncRepReleaseWaiters(void) { volatile WalSndCtlData *walsndctl = WalSndCtl; - WalSnd *syncWalSnd; - int numwrite = 0; - int numflush = 0; + XLogRecPtr write_pos = InvalidXLogRecPtr; + XLogRecPtr flush_pos = InvalidXLogRecPtr; + int numwrite, numflush; /* * If this WALSender is serving a standby that is not on the list of @@ -428,23 +542,12 @@ SyncRepReleaseWaiters(void) XLogRecPtrIsInvalid(MyWalSnd->flush)) return; - /* - * We're a potential sync standby. Release waiters if we are the highest - * priority standby. - */ LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); - syncWalSnd = SyncRepGetSynchronousStandby(); - /* We should have found ourselves at least */ - Assert(syncWalSnd != NULL); - - /* - * If we aren't managing the highest priority standby then just leave. - */ - if (syncWalSnd != MyWalSnd) + /* Get currently synced LSNs according */ + if (!(SyncRepSyncedLsnAdvancedTo(&write_pos, &flush_pos))) { LWLockRelease(SyncRepLock); - announce_next_takeover = true; return; } @@ -452,14 +555,14 @@ SyncRepReleaseWaiters(void) * Set the lsn first so that when we wake backends they will release up to * this location. */ - if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write) + if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < write_pos) { - walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write; + walsndctl->lsn[SYNC_REP_WAIT_WRITE] = write_pos; numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE); } - if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->flush) + if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flush_pos) { - walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush; + walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flush_pos; numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH); } @@ -506,6 +609,10 @@ SyncRepGetStandbyPriority(void) if (am_cascading_walsender) return 0; + /* If no synchronous standby allowed, no cake for this WAL sender */ + if (!SyncStandbysDefined()) + return 0; + /* Need a modifiable copy of string */ rawstring = pstrdup(SyncRepStandbyNames); @@ -521,8 +628,9 @@ SyncRepGetStandbyPriority(void) foreach(l, elemlist) { - char *standby_name = (char *) lfirst(l); + char *standby_name; + standby_name = (char *) lfirst(l); priority++; if (pg_strcasecmp(standby_name, application_name) == 0 || @@ -683,7 +791,6 @@ SyncRepQueueIsOrderedByLSN(int mode) * Synchronous Replication functions executed by any process * =========================================================== */ - bool check_synchronous_standby_names(char **newval, void **extra, GucSource source) { diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index c03e045..8586af4 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2749,9 +2749,11 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) Tuplestorestate *tupstore; MemoryContext per_query_ctx; MemoryContext oldcontext; - WalSnd *sync_standby; + int *sync_standbys; + int num_sync; int i; + /* check to see if caller supports us returning a tuplestore */ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) ereport(ERROR, @@ -2777,11 +2779,13 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) MemoryContextSwitchTo(oldcontext); + sync_standbys = (int *) palloc(sizeof(int) * synchronous_standby_num); + /* - * Get the currently active synchronous standby. + * Get the currently active synchronous standbys. */ LWLockAcquire(SyncRepLock, LW_SHARED); - sync_standby = SyncRepGetSynchronousStandby(); + num_sync = SyncRepGetSyncStandbys(sync_standbys); LWLockRelease(SyncRepLock); for (i = 0; i < max_wal_senders; i++) @@ -2854,18 +2858,34 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) */ if (priority == 0) values[7] = CStringGetTextDatum("async"); - else if (walsnd == sync_standby) - values[7] = CStringGetTextDatum("sync"); else - values[7] = CStringGetTextDatum("potential"); + { + int j; + bool found = false; + + for (j = 0; j < num_sync; j++) + { + /* Found sync standby */ + if (i == sync_standbys[j]) + { + values[7] = CStringGetTextDatum("sync"); + found = true; + break; + } + } + if (!found) + values[7] = CStringGetTextDatum("potential"); + } } tuplestore_putvalues(tupstore, tupdesc, values, nulls); } /* clean up and return the tuplestore */ + pfree(sync_standbys); tuplestore_donestoring(tupstore); + return (Datum) 0; } diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 38ba82f..3638c0d 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -2683,6 +2683,16 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"synchronous_standby_num", PGC_SIGHUP, REPLICATION_MASTER, + gettext_noop("Number fo sync standbys."), + NULL, + }, + &synchronous_standby_num, + 1, 0, SYNC_REP_MAX_SYNC_STANDBY_NUM, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 029114f..06f1a51 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -241,6 +241,7 @@ #synchronous_standby_names = '' # standby servers that provide sync rep # comma-separated list of application_name # from standby(s); '*' = all +#synchronous_standby_num = 0 # number of standby servers using sync rep #vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed # - Standby Servers - diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index 96e059b..0fb7fc6 100644 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -31,8 +31,13 @@ #define SYNC_REP_WAITING 1 #define SYNC_REP_WAIT_COMPLETE 2 +/* Limit of the number of synchronous standbys */ +#define SYNC_REP_MAX_SYNC_STANDBY_NUM 256 + /* user-settable parameters for synchronous replication */ extern char *SyncRepStandbyNames; +extern int synchronous_replication_method; +extern int synchronous_standby_num; /* called by user backend */ extern void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN); @@ -49,7 +54,12 @@ extern void SyncRepUpdateSyncStandbysDefined(void); /* forward declaration to avoid pulling in walsender_private.h */ struct WalSnd; -extern struct WalSnd *SyncRepGetSynchronousStandby(void); + +extern int SyncRepGetSyncStandbys(int *sync_standbys); +extern bool SyncRepSyncedLsnAdvancedTo(XLogRecPtr *write_pos, XLogRecPtr *flush_pos); +extern bool SyncRepActiveListedWalSender(int num); +extern int SyncRepGetSyncStandbys(int *sync_standbys); +extern bool SyncRepGetSyncLsns(XLogRecPtr *write_pos, XLogRecPtr *flush_pos); extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source); extern void assign_synchronous_commit(int newval, void *extra);
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers