On Fri, Aug 22, 2014 at 7:14 PM, Rajeev rastogi <rajeev.rast...@huawei.com> wrote:
> I have just started looking into this patch. > Please find below my first level of observation from the patch: > Thanks! Updated patch attached. > 1. Allocation of memory for sync_nodes in function > SyncRepGetSynchronousNodes should be equivalent to allowed_sync_nodes > instead of max_wal_senders. As anyway we are not going to store sync stdbys > more than allowed_sync_nodes. > sync_nodes = (int *) palloc(allowed_sync_nodes * > sizeof(int)); > Fixed. 2. Logic of deciding the highest priority one seems to be in-correct. > Assume, s_s_num = 3, s_s_names = 3,4,2,1 > standby nodes are in order as: 1,2,3,4,5,6,7 > > As per the logic in patch, node 4 with priority 2 will not be > added in the list whereas 1,2,3 will be added. > > The problem is because priority updated for next tracking is not > the highest priority as of that iteration, it is just priority of last node > added to the list. So it may happen that a node with higher priority > is still there in list but we are comparing with some other smaller > priority. > Fixed. Nice catch! > 3. Can we optimize the function SyncRepGetSynchronousNodes in such a way > that it gets the number of standby nodes from s_s_names itself. With this > it will be usful to stop scanning the moment we get first s_s_num potential > standbys. > By doing so, we would need to scan the WAL sender array more than once (or once if we can find N sync nodes with a name matching the first entry, smth unlikely to happen). We would need as well to recalculate for a given item in the list _names what is its priority and compare it with the existing entries in the WAL sender list. So this is not worth the shot. Also, using the priority instead of s_s_names is more solid as s_s_names is now used only in SyncRepGetStandbyPriority to calculate the priority for a given WAL sender, and is a function only called by a WAL sender itself when it initializes. Regards, -- Michael
*** a/doc/src/sgml/config.sgml --- b/doc/src/sgml/config.sgml *************** *** 2586,2597 **** include_dir 'conf.d' Specifies a comma-separated list of standby names that can support <firstterm>synchronous replication</>, as described in <xref linkend="synchronous-replication">. ! At any one time there will be at most one active synchronous standby; ! transactions waiting for commit will be allowed to proceed after ! this standby server confirms receipt of their data. ! The synchronous standby will be the first standby named in this list ! that is both currently connected and streaming data in real-time ! (as shown by a state of <literal>streaming</literal> in the <link linkend="monitoring-stats-views-table"> <literal>pg_stat_replication</></link> view). Other standby servers appearing later in this list represent potential --- 2586,2598 ---- Specifies a comma-separated list of standby names that can support <firstterm>synchronous replication</>, as described in <xref linkend="synchronous-replication">. ! At any one time there will be at a number of active synchronous standbys ! defined by <xref linkend="guc-synchronous-standby-num">, transactions ! waiting for commit will be allowed to proceed after those standby ! servers confirm receipt of their data. The synchronous standbys will be ! the first entries named in this list that are both currently connected ! and streaming data in real-time (as shown by a state of ! <literal>streaming</literal> in the <link linkend="monitoring-stats-views-table"> <literal>pg_stat_replication</></link> view). Other standby servers appearing later in this list represent potential *************** *** 2627,2632 **** include_dir 'conf.d' --- 2628,2685 ---- </listitem> </varlistentry> + <varlistentry id="guc-synchronous-standby-num" xreflabel="synchronous_standby_num"> + <term><varname>synchronous_standby_num</varname> (<type>integer</type>) + <indexterm> + <primary><varname>synchronous_standby_num</> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Specifies the number of standbys that support + <firstterm>synchronous replication</>. + </para> + <para> + Default value is <literal>-1</>. In this case, if + <xref linkend="guc-synchronous-standby-names"> is empty all the + standby nodes are considered asynchronous. If there is at least + one node name defined, process will wait for one synchronous + standby listed. + </para> + <para> + When this parameter is set to <literal>0</>, all the standby + nodes will be considered as asynchronous. + </para> + <para> + This parameter value cannot be higher than + <xref linkend="guc-max-wal-senders">. + </para> + <para> + Are considered as synchronous the first elements of + <xref linkend="guc-synchronous-standby-names"> in number of + <xref linkend="guc-synchronous-standby-num"> that are + connected. If there are more elements than the number of stansbys + required, all the additional standbys are potential synchronous + candidates. If <xref linkend="guc-synchronous-standby-names"> is + empty, all the standbys are asynchronous. If it is set to the + special entry <literal>*</>, a number of standbys equal to + <xref linkend="guc-synchronous-standby-names"> with the highest + pritority are elected as being synchronous. + </para> + <para> + Server will wait for commit confirmation from + <xref linkend="guc-synchronous-standby-num"> standbys, meaning that + if <xref linkend="guc-synchronous-standby-names"> has less elements + than the number of standbys required, server will wait indefinitely + for a commit confirmation. + </para> + <para> + This parameter can only be set in the <filename>postgresql.conf</> + file or on the server command line. + </para> + </listitem> + </varlistentry> + <varlistentry id="guc-vacuum-defer-cleanup-age" xreflabel="vacuum_defer_cleanup_age"> <term><varname>vacuum_defer_cleanup_age</varname> (<type>integer</type>) <indexterm> *** a/doc/src/sgml/high-availability.sgml --- b/doc/src/sgml/high-availability.sgml *************** *** 1081,1092 **** primary_slot_name = 'node_a_slot' WAL record is then sent to the standby. The standby sends reply messages each time a new batch of WAL data is written to disk, unless <varname>wal_receiver_status_interval</> is set to zero on the standby. ! If the standby is the first matching standby, as specified in ! <varname>synchronous_standby_names</> on the primary, the reply ! messages from that standby will be used to wake users waiting for ! confirmation that the commit record has been received. These parameters ! allow the administrator to specify which standby servers should be ! synchronous standbys. Note that the configuration of synchronous replication is mainly on the master. Named standbys must be directly connected to the master; the master knows nothing about downstream standby servers using cascaded replication. --- 1081,1092 ---- WAL record is then sent to the standby. The standby sends reply messages each time a new batch of WAL data is written to disk, unless <varname>wal_receiver_status_interval</> is set to zero on the standby. ! If the standby is the first <varname>synchronous_standby_num</> matching ! standbys, as specified in <varname>synchronous_standby_names</> on the ! primary, the reply messages from that standby will be used to wake users ! waiting for confirmation that the commit record has been received. These ! parameters allow the administrator to specify which standby servers should ! be synchronous standbys. Note that the configuration of synchronous replication is mainly on the master. Named standbys must be directly connected to the master; the master knows nothing about downstream standby servers using cascaded replication. *************** *** 1167,1177 **** primary_slot_name = 'node_a_slot' <para> The best solution for avoiding data loss is to ensure you don't lose ! your last remaining synchronous standby. This can be achieved by naming multiple potential synchronous standbys using <varname>synchronous_standby_names</>. ! The first named standby will be used as the synchronous standby. Standbys ! listed after this will take over the role of synchronous standby if the ! first one should fail. </para> <para> --- 1167,1177 ---- <para> The best solution for avoiding data loss is to ensure you don't lose ! your last remaining synchronous standbys. This can be achieved by naming multiple potential synchronous standbys using <varname>synchronous_standby_names</>. ! The first <varname>synchronous_standby_num</> named standbys will be used as ! the synchronous standbys. Standbys listed after this will take over the role ! of synchronous standby if the first one should fail. </para> <para> *** a/src/backend/replication/syncrep.c --- b/src/backend/replication/syncrep.c *************** *** 5,11 **** * Synchronous replication is new as of PostgreSQL 9.1. * * If requested, transaction commits wait until their commit LSN is ! * acknowledged by the sync standby. * * This module contains the code for waiting and release of backends. * All code in this module executes on the primary. The core streaming --- 5,11 ---- * Synchronous replication is new as of PostgreSQL 9.1. * * If requested, transaction commits wait until their commit LSN is ! * acknowledged by the synchronous standbys. * * This module contains the code for waiting and release of backends. * All code in this module executes on the primary. The core streaming *************** *** 29,39 **** * single ordered queue of waiting backends, so that we can avoid * searching the through all waiters each time we receive a reply. * ! * In 9.1 we support only a single synchronous standby, chosen from a ! * priority list of synchronous_standby_names. Before it can become the ! * synchronous standby it must have caught up with the primary; that may ! * take some time. Once caught up, the current highest priority standby ! * will release waiters from the queue. * * Portions Copyright (c) 2010-2014, PostgreSQL Global Development Group * --- 29,50 ---- * single ordered queue of waiting backends, so that we can avoid * searching the through all waiters each time we receive a reply. * ! * In 9.4 we support the possibility to have multiple synchronous standbys, ! * whose number is defined by synchronous_standby_num, chosen from a ! * priority list of synchronous_standby_names. Before one standby can ! * become a synchronous standby it must have caught up with the primary; ! * that may take some time. ! * ! * Waiters will be released from the queue once the number of standbys ! * defined by synchronous_standby_num have caught. ! * ! * There are special cases though. If synchronous_standby_num is set to 0, ! * all the nodes are considered as asynchronous and fastpath is out to ! * leave this portion of the code as soon as possible. If it is set to ! * -1, process will wait for one node to catch up with the primary only ! * if synchronous_standby_names is non-empty. This is compatible with ! * what has been defined in 9.1 as -1 is the default value of ! * synchronous_standby_num. * * Portions Copyright (c) 2010-2014, PostgreSQL Global Development Group * *************** *** 59,67 **** /* User-settable parameters for sync rep */ char *SyncRepStandbyNames; #define SyncStandbysDefined() \ ! (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0') static bool announce_next_takeover = true; --- 70,87 ---- /* User-settable parameters for sync rep */ char *SyncRepStandbyNames; + int synchronous_standby_num = -1; + /* + * Synchronous standbys are defined if there is more than + * one synchronous standby wanted. In default case, the list + * of standbys defined needs to be not empty. + */ #define SyncStandbysDefined() \ ! (synchronous_standby_num > 0 || \ ! (synchronous_standby_num == -1 && \ ! SyncRepStandbyNames != NULL && \ ! SyncRepStandbyNames[0] != '\0')) static bool announce_next_takeover = true; *************** *** 206,212 **** SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) ereport(WARNING, (errcode(ERRCODE_ADMIN_SHUTDOWN), errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"), ! errdetail("The transaction has already committed locally, but might not have been replicated to the standby."))); whereToSendOutput = DestNone; SyncRepCancelWait(); break; --- 226,232 ---- ereport(WARNING, (errcode(ERRCODE_ADMIN_SHUTDOWN), errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"), ! errdetail("The transaction has already committed locally, but might not have been replicated to the standby(s)."))); whereToSendOutput = DestNone; SyncRepCancelWait(); break; *************** *** 223,229 **** SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) QueryCancelPending = false; ereport(WARNING, (errmsg("canceling wait for synchronous replication due to user request"), ! errdetail("The transaction has already committed locally, but might not have been replicated to the standby."))); SyncRepCancelWait(); break; } --- 243,249 ---- QueryCancelPending = false; ereport(WARNING, (errmsg("canceling wait for synchronous replication due to user request"), ! errdetail("The transaction has already committed locally, but might not have been replicated to the standby(s)."))); SyncRepCancelWait(); break; } *************** *** 357,365 **** SyncRepInitConfig(void) } } /* * Update the LSNs on each queue based upon our latest state. This ! * implements a simple policy of first-valid-standby-releases-waiter. * * Other policies are possible, which would change what we do here and what * perhaps also which information we store as well. --- 377,483 ---- } } + + /* + * Obtain a palloc'd array containing positions of standbys currently + * considered as synchronous. Caller is responsible for freeing the + * data obtained and should as well take a necessary lock on SyncRepLock. + */ + int * + SyncRepGetSynchronousNodes(int *num_sync) + { + int *sync_nodes; + int priority = 0; + int i; + int allowed_sync_nodes = synchronous_standby_num; + + /* Initialize */ + *num_sync = 0; + + /* Leave if no synchronous nodes allowed */ + if (synchronous_standby_num == 0) + return NULL; + + /* + * Determine the number of nodes that can be synchronized. + * synchronous_standby_num can have the special value -1, + * meaning that only one node with the highest non-null priority + * can be considered as synchronous. + */ + if (synchronous_standby_num == -1) + allowed_sync_nodes = 1; + + /* + * Make enough room, there is a maximum of max_wal_senders synchronous + * nodes as we scan though WAL senders here. + */ + sync_nodes = (int *) palloc(allowed_sync_nodes * sizeof(int)); + + for (i = 0; i < max_wal_senders; i++) + { + /* Use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; + + /* Process to next if not active */ + if (walsnd->pid == 0) + continue; + + /* Process to next if not streaming */ + if (walsnd->state != WALSNDSTATE_STREAMING) + continue; + + /* Process to next one if asynchronous */ + if (walsnd->sync_standby_priority == 0) + continue; + + /* Process to next one if priority conditions not satisfied */ + if (priority != 0 && + priority <= walsnd->sync_standby_priority && + *num_sync == allowed_sync_nodes) + continue; + + /* Process to next one if flush position is invalid */ + if (XLogRecPtrIsInvalid(walsnd->flush)) + continue; + + /* + * We have a potential synchronous candidate, add it to the + * list of nodes already present or evict the node with highest + * priority found until now. + */ + if (*num_sync == allowed_sync_nodes) + { + int j; + for (j = 0; j < *num_sync; j++) + { + volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_nodes[j]]; + if (walsndloc->sync_standby_priority == priority) + { + sync_nodes[j] = i; + break; + } + } + } + else + { + sync_nodes[*num_sync] = i; + (*num_sync)++; + } + + /* + * Update priority for next tracking. This needs to be the highest + * priority value in all the existing items. + */ + if (priority < walsnd->sync_standby_priority) + priority = walsnd->sync_standby_priority; + } + + return sync_nodes; + } + /* * Update the LSNs on each queue based upon our latest state. This ! * implements a simple policy of first-valid-standbys-release-waiter. * * Other policies are possible, which would change what we do here and what * perhaps also which information we store as well. *************** *** 368,378 **** void SyncRepReleaseWaiters(void) { volatile WalSndCtlData *walsndctl = WalSndCtl; ! volatile WalSnd *syncWalSnd = NULL; int numwrite = 0; int numflush = 0; ! int priority = 0; int i; /* * If this WALSender is serving a standby that is not on the list of --- 486,499 ---- SyncRepReleaseWaiters(void) { volatile WalSndCtlData *walsndctl = WalSndCtl; ! int *sync_standbys; int numwrite = 0; int numflush = 0; ! int num_sync = 0; int i; + bool found = false; + XLogRecPtr min_write_pos; + XLogRecPtr min_flush_pos; /* * If this WALSender is serving a standby that is not on the list of *************** *** 388,454 **** SyncRepReleaseWaiters(void) /* * We're a potential sync standby. Release waiters if we are the highest * priority standby. If there are multiple standbys with same priorities ! * then we use the first mentioned standby. If you change this, also ! * change pg_stat_get_wal_senders(). */ LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); ! for (i = 0; i < max_wal_senders; i++) { ! /* use volatile pointer to prevent code rearrangement */ ! volatile WalSnd *walsnd = &walsndctl->walsnds[i]; ! ! if (walsnd->pid != 0 && ! walsnd->state == WALSNDSTATE_STREAMING && ! walsnd->sync_standby_priority > 0 && ! (priority == 0 || ! priority > walsnd->sync_standby_priority) && ! !XLogRecPtrIsInvalid(walsnd->flush)) { ! priority = walsnd->sync_standby_priority; ! syncWalSnd = walsnd; } } /* ! * We should have found ourselves at least. */ ! Assert(syncWalSnd); /* ! * If we aren't managing the highest priority standby then just leave. */ ! if (syncWalSnd != MyWalSnd) { LWLockRelease(SyncRepLock); ! announce_next_takeover = true; return; } /* * Set the lsn first so that when we wake backends they will release up to ! * this location. */ ! if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write) { ! walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write; numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE); } ! if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->flush) { ! walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush; numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH); } LWLockRelease(SyncRepLock); elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X", ! numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write, ! numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush); /* * If we are managing the highest priority standby, though we weren't ! * prior to this, then announce we are now the sync standby. */ if (announce_next_takeover) { --- 509,607 ---- /* * We're a potential sync standby. Release waiters if we are the highest * priority standby. If there are multiple standbys with same priorities ! * then we use the first mentioned standbys. */ LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + sync_standbys = SyncRepGetSynchronousNodes(&num_sync); ! /* ! * We should have found ourselves at least, except if it is not expected ! * to find any synchronous nodes. ! */ ! Assert(num_sync > 0); ! ! /* ! * If we aren't managing one of the standbys with highest priority ! * then just leave. ! */ ! for (i = 0; i < num_sync; i++) { ! volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_standbys[i]]; ! if (walsndloc == MyWalSnd) { ! found = true; ! break; } } /* ! * We are definitely not one of the chosen... But we could by ! * taking the next takeover. */ ! if (!found) ! { ! LWLockRelease(SyncRepLock); ! pfree(sync_standbys); ! announce_next_takeover = true; ! return; ! } /* ! * Even if we are one of the chosen standbys, leave if there ! * are less synchronous standbys in waiting state than what is ! * expected by the user. */ ! if (num_sync < synchronous_standby_num && ! synchronous_standby_num != -1) { LWLockRelease(SyncRepLock); ! pfree(sync_standbys); return; } /* * Set the lsn first so that when we wake backends they will release up to ! * this location, of course only if all the standbys found as synchronous ! * have already reached that point, so first find what are the oldest ! * write and flush positions of all the standbys considered in sync... */ ! min_write_pos = MyWalSnd->write; ! min_flush_pos = MyWalSnd->flush; ! for (i = 0; i < num_sync; i++) ! { ! volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_standbys[i]]; ! ! SpinLockAcquire(&walsndloc->mutex); ! if (min_write_pos > walsndloc->write) ! min_write_pos = walsndloc->write; ! if (min_flush_pos > walsndloc->flush) ! min_flush_pos = walsndloc->flush; ! SpinLockRelease(&walsndloc->mutex); ! } ! ! /* ... And now update if necessary */ ! if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < min_write_pos) { ! walsndctl->lsn[SYNC_REP_WAIT_WRITE] = min_write_pos; numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE); } ! if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < min_flush_pos) { ! walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = min_flush_pos; numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH); } LWLockRelease(SyncRepLock); elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X", ! numwrite, (uint32) (walsndctl->lsn[SYNC_REP_WAIT_WRITE] >> 32), ! (uint32) walsndctl->lsn[SYNC_REP_WAIT_WRITE], ! numflush, (uint32) (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] >> 32), ! (uint32) walsndctl->lsn[SYNC_REP_WAIT_FLUSH]); /* * If we are managing the highest priority standby, though we weren't ! * prior to this, then announce we are now a sync standby. */ if (announce_next_takeover) { *************** *** 457,462 **** SyncRepReleaseWaiters(void) --- 610,618 ---- (errmsg("standby \"%s\" is now the synchronous standby with priority %u", application_name, MyWalSnd->sync_standby_priority))); } + + /* Clean up */ + pfree(sync_standbys); } /* *************** *** 483,488 **** SyncRepGetStandbyPriority(void) --- 639,648 ---- if (am_cascading_walsender) return 0; + /* If no synchronous nodes allowed, no cake for this WAL sender */ + if (synchronous_standby_num == 0) + return 0; + /* Need a modifiable copy of string */ rawstring = pstrdup(SyncRepStandbyNames); *** a/src/backend/replication/walsender.c --- b/src/backend/replication/walsender.c *************** *** 2735,2742 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS) MemoryContext per_query_ctx; MemoryContext oldcontext; int *sync_priority; ! int priority = 0; ! int sync_standby = -1; int i; /* check to see if caller supports us returning a tuplestore */ --- 2735,2742 ---- MemoryContext per_query_ctx; MemoryContext oldcontext; int *sync_priority; ! int *sync_standbys; ! int num_sync = 0; int i; /* check to see if caller supports us returning a tuplestore */ *************** *** 2767,2802 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS) /* * Get the priorities of sync standbys all in one go, to minimise lock * acquisitions and to allow us to evaluate who is the current sync ! * standby. This code must match the code in SyncRepReleaseWaiters(). */ sync_priority = palloc(sizeof(int) * max_wal_senders); LWLockAcquire(SyncRepLock, LW_SHARED); for (i = 0; i < max_wal_senders; i++) { /* use volatile pointer to prevent code rearrangement */ volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; ! if (walsnd->pid != 0) ! { ! /* ! * Treat a standby such as a pg_basebackup background process ! * which always returns an invalid flush location, as an ! * asynchronous standby. ! */ ! sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ? ! 0 : walsnd->sync_standby_priority; ! ! if (walsnd->state == WALSNDSTATE_STREAMING && ! walsnd->sync_standby_priority > 0 && ! (priority == 0 || ! priority > walsnd->sync_standby_priority) && ! !XLogRecPtrIsInvalid(walsnd->flush)) ! { ! priority = walsnd->sync_standby_priority; ! sync_standby = i; ! } ! } } LWLockRelease(SyncRepLock); for (i = 0; i < max_wal_senders; i++) --- 2767,2789 ---- /* * Get the priorities of sync standbys all in one go, to minimise lock * acquisitions and to allow us to evaluate who is the current sync ! * standby. */ sync_priority = palloc(sizeof(int) * max_wal_senders); LWLockAcquire(SyncRepLock, LW_SHARED); + + /* Get first the priorities on each standby as long as we hold a lock */ for (i = 0; i < max_wal_senders; i++) { /* use volatile pointer to prevent code rearrangement */ volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; ! sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ? ! 0 : walsnd->sync_standby_priority; } + + /* Obtain list of synchronous standbys */ + sync_standbys = SyncRepGetSynchronousNodes(&num_sync); LWLockRelease(SyncRepLock); for (i = 0; i < max_wal_senders; i++) *************** *** 2858,2872 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS) */ if (sync_priority[i] == 0) values[7] = CStringGetTextDatum("async"); - else if (i == sync_standby) - values[7] = CStringGetTextDatum("sync"); else ! values[7] = CStringGetTextDatum("potential"); } tuplestore_putvalues(tupstore, tupdesc, values, nulls); } pfree(sync_priority); /* clean up and return the tuplestore */ tuplestore_donestoring(tupstore); --- 2845,2876 ---- */ if (sync_priority[i] == 0) values[7] = CStringGetTextDatum("async"); else ! { ! int j; ! bool found = false; ! ! for (j = 0; j < num_sync; j++) ! { ! /* Found that this node is one in sync */ ! if (i == sync_standbys[j]) ! { ! values[7] = CStringGetTextDatum("sync"); ! found = true; ! break; ! } ! } ! if (!found) ! values[7] = CStringGetTextDatum("potential"); ! } } tuplestore_putvalues(tupstore, tupdesc, values, nulls); } + + /* Cleanup */ pfree(sync_priority); + pfree(sync_standbys); /* clean up and return the tuplestore */ tuplestore_donestoring(tupstore); *** a/src/backend/utils/misc/guc.c --- b/src/backend/utils/misc/guc.c *************** *** 2548,2553 **** static struct config_int ConfigureNamesInt[] = --- 2548,2563 ---- NULL, NULL, NULL }, + { + {"synchronous_standby_num", PGC_SIGHUP, REPLICATION_MASTER, + gettext_noop("Number of potential synchronous standbys."), + NULL + }, + &synchronous_standby_num, + -1, -1, INT_MAX, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL *** a/src/backend/utils/misc/postgresql.conf.sample --- b/src/backend/utils/misc/postgresql.conf.sample *************** *** 235,240 **** --- 235,241 ---- #synchronous_standby_names = '' # standby servers that provide sync rep # comma-separated list of application_name # from standby(s); '*' = all + #synchronous_standby_num = -1 # number of standbys servers using sync rep #vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed # - Standby Servers - *** a/src/include/replication/syncrep.h --- b/src/include/replication/syncrep.h *************** *** 33,38 **** --- 33,39 ---- /* user-settable parameters for synchronous replication */ extern char *SyncRepStandbyNames; + extern int synchronous_standby_num; /* called by user backend */ extern void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN); *************** *** 49,54 **** extern void SyncRepUpdateSyncStandbysDefined(void); --- 50,56 ---- /* called by various procs */ extern int SyncRepWakeQueue(bool all, int mode); + extern int *SyncRepGetSynchronousNodes(int *num_sync); extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source); extern void assign_synchronous_commit(int newval, void *extra);
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers