On Fri, Aug 22, 2014 at 11:42 PM, Michael Paquier <michael.paqu...@gmail.com> wrote: >> >> 2. Logic of deciding the highest priority one seems to be in-correct. >> Assume, s_s_num = 3, s_s_names = 3,4,2,1 >> standby nodes are in order as: 1,2,3,4,5,6,7 >> >> As per the logic in patch, node 4 with priority 2 will not be added >> in the list whereas 1,2,3 will be added. >> >> The problem is because priority updated for next tracking is not the >> highest priority as of that iteration, it is just priority of last node >> added to the list. So it may happen that a node with higher priority >> is still there in list but we are comparing with some other smaller priority. > > > Fixed. Nice catch!
Actually by re-reading the code I wrote yesterday I found that the fix in v6 for that is not correct. That's really fixed with v7 attached. Regards, -- Michael
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index f23e5dc..d085f48 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -2586,12 +2586,13 @@ include_dir 'conf.d' Specifies a comma-separated list of standby names that can support <firstterm>synchronous replication</>, as described in <xref linkend="synchronous-replication">. - At any one time there will be at most one active synchronous standby; - transactions waiting for commit will be allowed to proceed after - this standby server confirms receipt of their data. - The synchronous standby will be the first standby named in this list - that is both currently connected and streaming data in real-time - (as shown by a state of <literal>streaming</literal> in the + At any one time there will be at a number of active synchronous standbys + defined by <xref linkend="guc-synchronous-standby-num">, transactions + waiting for commit will be allowed to proceed after those standby + servers confirm receipt of their data. The synchronous standbys will be + the first entries named in this list that are both currently connected + and streaming data in real-time (as shown by a state of + <literal>streaming</literal> in the <link linkend="monitoring-stats-views-table"> <literal>pg_stat_replication</></link> view). Other standby servers appearing later in this list represent potential @@ -2627,6 +2628,58 @@ include_dir 'conf.d' </listitem> </varlistentry> + <varlistentry id="guc-synchronous-standby-num" xreflabel="synchronous_standby_num"> + <term><varname>synchronous_standby_num</varname> (<type>integer</type>) + <indexterm> + <primary><varname>synchronous_standby_num</> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Specifies the number of standbys that support + <firstterm>synchronous replication</>. + </para> + <para> + Default value is <literal>-1</>. In this case, if + <xref linkend="guc-synchronous-standby-names"> is empty all the + standby nodes are considered asynchronous. If there is at least + one node name defined, process will wait for one synchronous + standby listed. + </para> + <para> + When this parameter is set to <literal>0</>, all the standby + nodes will be considered as asynchronous. + </para> + <para> + This parameter value cannot be higher than + <xref linkend="guc-max-wal-senders">. + </para> + <para> + Are considered as synchronous the first elements of + <xref linkend="guc-synchronous-standby-names"> in number of + <xref linkend="guc-synchronous-standby-num"> that are + connected. If there are more elements than the number of stansbys + required, all the additional standbys are potential synchronous + candidates. If <xref linkend="guc-synchronous-standby-names"> is + empty, all the standbys are asynchronous. If it is set to the + special entry <literal>*</>, a number of standbys equal to + <xref linkend="guc-synchronous-standby-names"> with the highest + pritority are elected as being synchronous. + </para> + <para> + Server will wait for commit confirmation from + <xref linkend="guc-synchronous-standby-num"> standbys, meaning that + if <xref linkend="guc-synchronous-standby-names"> has less elements + than the number of standbys required, server will wait indefinitely + for a commit confirmation. + </para> + <para> + This parameter can only be set in the <filename>postgresql.conf</> + file or on the server command line. + </para> + </listitem> + </varlistentry> + <varlistentry id="guc-vacuum-defer-cleanup-age" xreflabel="vacuum_defer_cleanup_age"> <term><varname>vacuum_defer_cleanup_age</varname> (<type>integer</type>) <indexterm> diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml index d249959..ec0ea70 100644 --- a/doc/src/sgml/high-availability.sgml +++ b/doc/src/sgml/high-availability.sgml @@ -1081,12 +1081,12 @@ primary_slot_name = 'node_a_slot' WAL record is then sent to the standby. The standby sends reply messages each time a new batch of WAL data is written to disk, unless <varname>wal_receiver_status_interval</> is set to zero on the standby. - If the standby is the first matching standby, as specified in - <varname>synchronous_standby_names</> on the primary, the reply - messages from that standby will be used to wake users waiting for - confirmation that the commit record has been received. These parameters - allow the administrator to specify which standby servers should be - synchronous standbys. Note that the configuration of synchronous + If the standby is the first <varname>synchronous_standby_num</> matching + standbys, as specified in <varname>synchronous_standby_names</> on the + primary, the reply messages from that standby will be used to wake users + waiting for confirmation that the commit record has been received. These + parameters allow the administrator to specify which standby servers should + be synchronous standbys. Note that the configuration of synchronous replication is mainly on the master. Named standbys must be directly connected to the master; the master knows nothing about downstream standby servers using cascaded replication. @@ -1167,11 +1167,11 @@ primary_slot_name = 'node_a_slot' <para> The best solution for avoiding data loss is to ensure you don't lose - your last remaining synchronous standby. This can be achieved by naming multiple + your last remaining synchronous standbys. This can be achieved by naming multiple potential synchronous standbys using <varname>synchronous_standby_names</>. - The first named standby will be used as the synchronous standby. Standbys - listed after this will take over the role of synchronous standby if the - first one should fail. + The first <varname>synchronous_standby_num</> named standbys will be used as + the synchronous standbys. Standbys listed after this will take over the role + of synchronous standby if the first one should fail. </para> <para> diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index aa54bfb..ddfd36b 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -5,7 +5,7 @@ * Synchronous replication is new as of PostgreSQL 9.1. * * If requested, transaction commits wait until their commit LSN is - * acknowledged by the sync standby. + * acknowledged by the synchronous standbys. * * This module contains the code for waiting and release of backends. * All code in this module executes on the primary. The core streaming @@ -29,11 +29,22 @@ * single ordered queue of waiting backends, so that we can avoid * searching the through all waiters each time we receive a reply. * - * In 9.1 we support only a single synchronous standby, chosen from a - * priority list of synchronous_standby_names. Before it can become the - * synchronous standby it must have caught up with the primary; that may - * take some time. Once caught up, the current highest priority standby - * will release waiters from the queue. + * In 9.4 we support the possibility to have multiple synchronous standbys, + * whose number is defined by synchronous_standby_num, chosen from a + * priority list of synchronous_standby_names. Before one standby can + * become a synchronous standby it must have caught up with the primary; + * that may take some time. + * + * Waiters will be released from the queue once the number of standbys + * defined by synchronous_standby_num have caught. + * + * There are special cases though. If synchronous_standby_num is set to 0, + * all the nodes are considered as asynchronous and fastpath is out to + * leave this portion of the code as soon as possible. If it is set to + * -1, process will wait for one node to catch up with the primary only + * if synchronous_standby_names is non-empty. This is compatible with + * what has been defined in 9.1 as -1 is the default value of + * synchronous_standby_num. * * Portions Copyright (c) 2010-2014, PostgreSQL Global Development Group * @@ -59,9 +70,18 @@ /* User-settable parameters for sync rep */ char *SyncRepStandbyNames; +int synchronous_standby_num = -1; +/* + * Synchronous standbys are defined if there is more than + * one synchronous standby wanted. In default case, the list + * of standbys defined needs to be not empty. + */ #define SyncStandbysDefined() \ - (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0') + (synchronous_standby_num > 0 || \ + (synchronous_standby_num == -1 && \ + SyncRepStandbyNames != NULL && \ + SyncRepStandbyNames[0] != '\0')) static bool announce_next_takeover = true; @@ -206,7 +226,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) ereport(WARNING, (errcode(ERRCODE_ADMIN_SHUTDOWN), errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"), - errdetail("The transaction has already committed locally, but might not have been replicated to the standby."))); + errdetail("The transaction has already committed locally, but might not have been replicated to the standby(s)."))); whereToSendOutput = DestNone; SyncRepCancelWait(); break; @@ -223,7 +243,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) QueryCancelPending = false; ereport(WARNING, (errmsg("canceling wait for synchronous replication due to user request"), - errdetail("The transaction has already committed locally, but might not have been replicated to the standby."))); + errdetail("The transaction has already committed locally, but might not have been replicated to the standby(s)."))); SyncRepCancelWait(); break; } @@ -357,9 +377,117 @@ SyncRepInitConfig(void) } } + +/* + * Obtain a palloc'd array containing positions of standbys currently + * considered as synchronous. Caller is responsible for freeing the + * data obtained and should as well take a necessary lock on SyncRepLock. + */ +int * +SyncRepGetSynchronousNodes(int *num_sync) +{ + int *sync_nodes; + int priority = 0; + int i; + int allowed_sync_nodes = synchronous_standby_num; + + /* Initialize */ + *num_sync = 0; + + /* Leave if no synchronous nodes allowed */ + if (synchronous_standby_num == 0) + return NULL; + + /* + * Determine the number of nodes that can be synchronized. + * synchronous_standby_num can have the special value -1, + * meaning that only one node with the highest non-null priority + * can be considered as synchronous. + */ + if (synchronous_standby_num == -1) + allowed_sync_nodes = 1; + + /* + * Make enough room, there is a maximum of max_wal_senders synchronous + * nodes as we scan though WAL senders here. + */ + sync_nodes = (int *) palloc(allowed_sync_nodes * sizeof(int)); + + for (i = 0; i < max_wal_senders; i++) + { + /* Use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; + int j; + + /* Process to next if not active */ + if (walsnd->pid == 0) + continue; + + /* Process to next if not streaming */ + if (walsnd->state != WALSNDSTATE_STREAMING) + continue; + + /* Process to next one if asynchronous */ + if (walsnd->sync_standby_priority == 0) + continue; + + /* Process to next one if priority conditions not satisfied */ + if (priority != 0 && + priority <= walsnd->sync_standby_priority && + *num_sync == allowed_sync_nodes) + continue; + + /* Process to next one if flush position is invalid */ + if (XLogRecPtrIsInvalid(walsnd->flush)) + continue; + + /* + * We have a potential synchronous candidate, add it to the + * list of nodes already present or evict the node with highest + * priority found until now. + */ + if (*num_sync == allowed_sync_nodes) + { + for (j = 0; j < *num_sync; j++) + { + volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_nodes[j]]; + if (walsndloc->sync_standby_priority == priority) + { + sync_nodes[j] = i; + break; + } + } + } + else + { + sync_nodes[*num_sync] = i; + (*num_sync)++; + } + + /* + * Update priority for next tracking. This needs to be the highest + * priority value in all the existing items. + */ + if (*num_sync == 1) + priority = walsnd->sync_standby_priority; + else + { + priority = 0; + for (j = 0; j < *num_sync; j++) + { + volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_nodes[j]]; + if (priority < walsndloc->sync_standby_priority) + priority = walsndloc->sync_standby_priority; + } + } + } + + return sync_nodes; +} + /* * Update the LSNs on each queue based upon our latest state. This - * implements a simple policy of first-valid-standby-releases-waiter. + * implements a simple policy of first-valid-standbys-release-waiter. * * Other policies are possible, which would change what we do here and what * perhaps also which information we store as well. @@ -368,11 +496,14 @@ void SyncRepReleaseWaiters(void) { volatile WalSndCtlData *walsndctl = WalSndCtl; - volatile WalSnd *syncWalSnd = NULL; + int *sync_standbys; int numwrite = 0; int numflush = 0; - int priority = 0; + int num_sync = 0; int i; + bool found = false; + XLogRecPtr min_write_pos; + XLogRecPtr min_flush_pos; /* * If this WALSender is serving a standby that is not on the list of @@ -388,67 +519,99 @@ SyncRepReleaseWaiters(void) /* * We're a potential sync standby. Release waiters if we are the highest * priority standby. If there are multiple standbys with same priorities - * then we use the first mentioned standby. If you change this, also - * change pg_stat_get_wal_senders(). + * then we use the first mentioned standbys. */ LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + sync_standbys = SyncRepGetSynchronousNodes(&num_sync); - for (i = 0; i < max_wal_senders; i++) + /* + * We should have found ourselves at least, except if it is not expected + * to find any synchronous nodes. + */ + Assert(num_sync > 0); + + /* + * If we aren't managing one of the standbys with highest priority + * then just leave. + */ + for (i = 0; i < num_sync; i++) { - /* use volatile pointer to prevent code rearrangement */ - volatile WalSnd *walsnd = &walsndctl->walsnds[i]; - - if (walsnd->pid != 0 && - walsnd->state == WALSNDSTATE_STREAMING && - walsnd->sync_standby_priority > 0 && - (priority == 0 || - priority > walsnd->sync_standby_priority) && - !XLogRecPtrIsInvalid(walsnd->flush)) + volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_standbys[i]]; + if (walsndloc == MyWalSnd) { - priority = walsnd->sync_standby_priority; - syncWalSnd = walsnd; + found = true; + break; } } /* - * We should have found ourselves at least. + * We are definitely not one of the chosen... But we could by + * taking the next takeover. */ - Assert(syncWalSnd); + if (!found) + { + LWLockRelease(SyncRepLock); + pfree(sync_standbys); + announce_next_takeover = true; + return; + } /* - * If we aren't managing the highest priority standby then just leave. + * Even if we are one of the chosen standbys, leave if there + * are less synchronous standbys in waiting state than what is + * expected by the user. */ - if (syncWalSnd != MyWalSnd) + if (num_sync < synchronous_standby_num && + synchronous_standby_num != -1) { LWLockRelease(SyncRepLock); - announce_next_takeover = true; + pfree(sync_standbys); return; } /* * Set the lsn first so that when we wake backends they will release up to - * this location. + * this location, of course only if all the standbys found as synchronous + * have already reached that point, so first find what are the oldest + * write and flush positions of all the standbys considered in sync... */ - if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write) + min_write_pos = MyWalSnd->write; + min_flush_pos = MyWalSnd->flush; + for (i = 0; i < num_sync; i++) + { + volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_standbys[i]]; + + SpinLockAcquire(&walsndloc->mutex); + if (min_write_pos > walsndloc->write) + min_write_pos = walsndloc->write; + if (min_flush_pos > walsndloc->flush) + min_flush_pos = walsndloc->flush; + SpinLockRelease(&walsndloc->mutex); + } + + /* ... And now update if necessary */ + if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < min_write_pos) { - walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write; + walsndctl->lsn[SYNC_REP_WAIT_WRITE] = min_write_pos; numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE); } - if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->flush) + if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < min_flush_pos) { - walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush; + walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = min_flush_pos; numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH); } LWLockRelease(SyncRepLock); elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X", - numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write, - numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush); + numwrite, (uint32) (walsndctl->lsn[SYNC_REP_WAIT_WRITE] >> 32), + (uint32) walsndctl->lsn[SYNC_REP_WAIT_WRITE], + numflush, (uint32) (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] >> 32), + (uint32) walsndctl->lsn[SYNC_REP_WAIT_FLUSH]); /* * If we are managing the highest priority standby, though we weren't - * prior to this, then announce we are now the sync standby. + * prior to this, then announce we are now a sync standby. */ if (announce_next_takeover) { @@ -457,6 +620,9 @@ SyncRepReleaseWaiters(void) (errmsg("standby \"%s\" is now the synchronous standby with priority %u", application_name, MyWalSnd->sync_standby_priority))); } + + /* Clean up */ + pfree(sync_standbys); } /* @@ -483,6 +649,10 @@ SyncRepGetStandbyPriority(void) if (am_cascading_walsender) return 0; + /* If no synchronous nodes allowed, no cake for this WAL sender */ + if (synchronous_standby_num == 0) + return 0; + /* Need a modifiable copy of string */ rawstring = pstrdup(SyncRepStandbyNames); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 844a5de..0a918c7 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2735,8 +2735,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) MemoryContext per_query_ctx; MemoryContext oldcontext; int *sync_priority; - int priority = 0; - int sync_standby = -1; + int *sync_standbys; + int num_sync = 0; int i; /* check to see if caller supports us returning a tuplestore */ @@ -2767,36 +2767,23 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) /* * Get the priorities of sync standbys all in one go, to minimise lock * acquisitions and to allow us to evaluate who is the current sync - * standby. This code must match the code in SyncRepReleaseWaiters(). + * standby. */ sync_priority = palloc(sizeof(int) * max_wal_senders); LWLockAcquire(SyncRepLock, LW_SHARED); + + /* Get first the priorities on each standby as long as we hold a lock */ for (i = 0; i < max_wal_senders; i++) { /* use volatile pointer to prevent code rearrangement */ volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; - if (walsnd->pid != 0) - { - /* - * Treat a standby such as a pg_basebackup background process - * which always returns an invalid flush location, as an - * asynchronous standby. - */ - sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ? - 0 : walsnd->sync_standby_priority; - - if (walsnd->state == WALSNDSTATE_STREAMING && - walsnd->sync_standby_priority > 0 && - (priority == 0 || - priority > walsnd->sync_standby_priority) && - !XLogRecPtrIsInvalid(walsnd->flush)) - { - priority = walsnd->sync_standby_priority; - sync_standby = i; - } - } + sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ? + 0 : walsnd->sync_standby_priority; } + + /* Obtain list of synchronous standbys */ + sync_standbys = SyncRepGetSynchronousNodes(&num_sync); LWLockRelease(SyncRepLock); for (i = 0; i < max_wal_senders; i++) @@ -2858,15 +2845,32 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) */ if (sync_priority[i] == 0) values[7] = CStringGetTextDatum("async"); - else if (i == sync_standby) - values[7] = CStringGetTextDatum("sync"); else - values[7] = CStringGetTextDatum("potential"); + { + int j; + bool found = false; + + for (j = 0; j < num_sync; j++) + { + /* Found that this node is one in sync */ + if (i == sync_standbys[j]) + { + values[7] = CStringGetTextDatum("sync"); + found = true; + break; + } + } + if (!found) + values[7] = CStringGetTextDatum("potential"); + } } tuplestore_putvalues(tupstore, tupdesc, values, nulls); } + + /* Cleanup */ pfree(sync_priority); + pfree(sync_standbys); /* clean up and return the tuplestore */ tuplestore_donestoring(tupstore); diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index a8a17c2..307cb68 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -2548,6 +2548,16 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"synchronous_standby_num", PGC_SIGHUP, REPLICATION_MASTER, + gettext_noop("Number of potential synchronous standbys."), + NULL + }, + &synchronous_standby_num, + -1, -1, INT_MAX, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index df98b02..5c1e27c 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -235,6 +235,7 @@ #synchronous_standby_names = '' # standby servers that provide sync rep # comma-separated list of application_name # from standby(s); '*' = all +#synchronous_standby_num = -1 # number of standbys servers using sync rep #vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed # - Standby Servers - diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index 7eeaf3b..9f05ba9 100644 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -33,6 +33,7 @@ /* user-settable parameters for synchronous replication */ extern char *SyncRepStandbyNames; +extern int synchronous_standby_num; /* called by user backend */ extern void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN); @@ -49,6 +50,7 @@ extern void SyncRepUpdateSyncStandbysDefined(void); /* called by various procs */ extern int SyncRepWakeQueue(bool all, int mode); +extern int *SyncRepGetSynchronousNodes(int *num_sync); extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source); extern void assign_synchronous_commit(int newval, void *extra);
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers