On Fri, Dec 16, 2016 at 5:04 PM, Fujii Masao <[email protected]> wrote:
> On Fri, Dec 16, 2016 at 2:38 PM, Michael Paquier
> <[email protected]> wrote:
>> On Thu, Dec 15, 2016 at 6:08 PM, Masahiko Sawada <[email protected]>
>> wrote:
>>> Attached latest v12 patch.
>>> I changed behavior of "N (standby_list)" to use the priority method
>>> and incorporated some review comments so far. Please review it.
>>
>> Some comments...
>>
>> + Another example of <varname>synchronous_standby_names</> for multiple
>> + synchronous standby is:
>> Here standby takes an 's'.
>>
>> + candidates. The master server will wait for at least 2 replies from
>> them.
>> + <literal>s4</> is an asynchronous standby since its name is not in the
>> list.
>> + </para>
>> "will wait for replies from at least two of them".
>>
>> + * next-highest-priority standby. In quorum method, the all standbys
>> + * appearing in the list are considered as a candidate for quorum commit.
>> "the all" is incorrect. I think you mean "all the" instead.
>>
>> + * NIL if no sync standby is connected. In quorum method, all standby
>> + * priorities are same, that is 1. So this function returns the list of
>> This is not true. Standys have a priority number assigned. Though it does
>> not matter much for quorum groups, it gives an indication of their position
>> in the defined list.
>>
>> #synchronous_standby_names = '' # standby servers that provide sync rep
>> - # number of sync standbys and comma-separated list of
>> application_name
>> + # synchronization method, number of sync standbys
>> + # and comma-separated list of application_name
>> # from standby(s); '*' = all
>> The formulation is funny here: "sync rep synchronization method".
>>
>> I think that Fujii-san has also some doc changes in his box. For anybody
>> picking up this patch next, it would be good to incorporate the things
>> I have noticed here.
>
> Yes, I will. Thanks!
Attached is the modified version of the patch. Barring objections, I will
commit this version.
Even after committing the patch, there will be still many source comments
and documentations that we need to update, for example,
in high-availability.sgml. We need to check and update them throughly later.
Regards,
--
Fujii Masao
*** a/doc/src/sgml/config.sgml
--- b/doc/src/sgml/config.sgml
***************
*** 3054,3094 **** include_dir 'conf.d'
transactions waiting for commit will be allowed to proceed after
these standby servers confirm receipt of their data.
The synchronous standbys will be those whose names appear
! earlier in this list, and
that are both currently connected and streaming data in real-time
(as shown by a state of <literal>streaming</literal> in the
<link linkend="monitoring-stats-views-table">
<literal>pg_stat_replication</></link> view).
! Other standby servers appearing later in this list represent potential
! synchronous standbys. If any of the current synchronous
! standbys disconnects for whatever reason,
! it will be replaced immediately with the next-highest-priority standby.
! Specifying more than one standby name can allow very high availability.
</para>
<para>
This parameter specifies a list of standby servers using
either of the following syntaxes:
<synopsis>
! <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="parameter">standby_name</replaceable> [, ...] )
<replaceable class="parameter">standby_name</replaceable> [, ...]
</synopsis>
where <replaceable class="parameter">num_sync</replaceable> is
the number of synchronous standbys that transactions need to
wait for replies from,
and <replaceable class="parameter">standby_name</replaceable>
! is the name of a standby server. For example, a setting of
! <literal>3 (s1, s2, s3, s4)</> makes transaction commits wait
! until their WAL records are received by three higher-priority standbys
! chosen from standby servers <literal>s1</>, <literal>s2</>,
! <literal>s3</> and <literal>s4</>.
! </para>
! <para>
! The second syntax was used before <productname>PostgreSQL</>
version 9.6 and is still supported. It's the same as the first syntax
! with <replaceable class="parameter">num_sync</replaceable> equal to 1.
! For example, <literal>1 (s1, s2)</> and
! <literal>s1, s2</> have the same meaning: either <literal>s1</>
! or <literal>s2</> is chosen as a synchronous standby.
</para>
<para>
The name of a standby server for this purpose is the
--- 3054,3124 ----
transactions waiting for commit will be allowed to proceed after
these standby servers confirm receipt of their data.
The synchronous standbys will be those whose names appear
! in this list, and
that are both currently connected and streaming data in real-time
(as shown by a state of <literal>streaming</literal> in the
<link linkend="monitoring-stats-views-table">
<literal>pg_stat_replication</></link> view).
! Specifying more than one standby names can allow very high availability.
</para>
<para>
This parameter specifies a list of standby servers using
either of the following syntaxes:
<synopsis>
! [FIRST] <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="parameter">standby_name</replaceable> [, ...] )
! ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="parameter">standby_name</replaceable> [, ...] )
<replaceable class="parameter">standby_name</replaceable> [, ...]
</synopsis>
where <replaceable class="parameter">num_sync</replaceable> is
the number of synchronous standbys that transactions need to
wait for replies from,
and <replaceable class="parameter">standby_name</replaceable>
! is the name of a standby server.
! <literal>FIRST</> and <literal>ANY</> specify the method to choose
! synchronous standbys from the listed servers.
! </para>
! <para>
! The keyword <literal>FIRST</>, coupled with
! <replaceable class="parameter">num_sync</replaceable>, specifies a
! priority-based synchronous replication and makes transaction commits
! wait until their WAL records are replicated to
! <replaceable class="parameter">num_sync</replaceable> synchronous
! standbys chosen based on their priorities. For example, a setting of
! <literal>FIRST 3 (s1, s2, s3, s4)</> will cause each commit to wait for
! replies from three higher-priority standbys chosen from standby servers
! <literal>s1</>, <literal>s2</>, <literal>s3</> and <literal>s4</>.
! The standbys whose names appear earlier in the list are given higher
! priority and will be considered as synchronous. Other standby servers
! appearing later in this list represent potential synchronous standbys.
! If any of the current synchronous standbys disconnects for whatever
! reason, it will be replaced immediately with the next-highest-priority
! standby. The keyword <literal>FIRST</> is optional.
! </para>
! <para>
! The keyword <literal>ANY</>, coupled with
! <replaceable class="parameter">num_sync</replaceable>, specifies a
! quorum-based synchronous replication and makes transaction commits
! wait until their WAL records are replicated to <emphasis>at least</>
! <replaceable class="parameter">num_sync</replaceable> listed standbys.
! For example, a setting of <literal>ANY 3 (s1, s2, s3, s4)</> will cause
! each commit to proceed as soon as at least any three standbys of
! <literal>s1</>, <literal>s2</>, <literal>s3</> and <literal>s4</>
! reply.
! </para>
! <para>
! <literal>FIRST</> and <literal>ANY</> are case-insensitive. If these
! keywords are used as the name of a standby server,
! its <replaceable class="parameter">standby_name</replaceable> must
! be double-quoted.
! </para>
! <para>
! The third syntax was used before <productname>PostgreSQL</>
version 9.6 and is still supported. It's the same as the first syntax
! with <literal>FIRST</> and
! <replaceable class="parameter">num_sync</replaceable> equal to 1.
! For example, <literal>FIRST 1 (s1, s2)</> and <literal>s1, s2</> have
! the same meaning: either <literal>s1</> or <literal>s2</> is chosen
! as a synchronous standby.
</para>
<para>
The name of a standby server for this purpose is the
*** a/doc/src/sgml/high-availability.sgml
--- b/doc/src/sgml/high-availability.sgml
***************
*** 1138,1156 **** primary_slot_name = 'node_a_slot'
as synchronous confirm receipt of their data. The number of synchronous
standbys that transactions must wait for replies from is specified in
<varname>synchronous_standby_names</>. This parameter also specifies
! a list of standby names, which determines the priority of each standby
! for being chosen as a synchronous standby. The standbys whose names
! appear earlier in the list are given higher priority and will be considered
! as synchronous. Other standby servers appearing later in this list
! represent potential synchronous standbys. If any of the current
! synchronous standbys disconnects for whatever reason, it will be replaced
! immediately with the next-highest-priority standby.
</para>
<para>
! An example of <varname>synchronous_standby_names</> for multiple
! synchronous standbys is:
<programlisting>
! synchronous_standby_names = '2 (s1, s2, s3)'
</programlisting>
In this example, if four standby servers <literal>s1</>, <literal>s2</>,
<literal>s3</> and <literal>s4</> are running, the two standbys
--- 1138,1162 ----
as synchronous confirm receipt of their data. The number of synchronous
standbys that transactions must wait for replies from is specified in
<varname>synchronous_standby_names</>. This parameter also specifies
! a list of standby names and the method (<literal>FIRST</> and
! <literal>ANY</>) to choose synchronous standbys from the listed ones.
</para>
<para>
! The method <literal>FIRST</> specifies a priority-based synchronous
! replication and makes transaction commits wait until their WAL records are
! replicated to the requested number of synchronous standbys chosen based on
! their priorities. The standbys whose names appear earlier in the list are
! given higher priority and will be considered as synchronous. Other standby
! servers appearing later in this list represent potential synchronous
! standbys. If any of the current synchronous standbys disconnects for
! whatever reason, it will be replaced immediately with the
! next-highest-priority standby.
! </para>
! <para>
! An example of <varname>synchronous_standby_names</> for
! a priority-based multiple synchronous standbys is:
<programlisting>
! synchronous_standby_names = 'FIRST 2 (s1, s2, s3)'
</programlisting>
In this example, if four standby servers <literal>s1</>, <literal>s2</>,
<literal>s3</> and <literal>s4</> are running, the two standbys
***************
*** 1162,1167 **** synchronous_standby_names = '2 (s1, s2, s3)'
--- 1168,1191 ----
its name is not in the list.
</para>
<para>
+ The method <literal>ANY</> specifies a quorum-based synchronous
+ replication and makes transaction commits wait until their WAL records
+ are replicated to <emphasis>at least</> the requested number of
+ synchronous standbys in the list.
+ </para>
+ <para>
+ An example of <varname>synchronous_standby_names</> for
+ a quorum-based multiple synchronous standbys is:
+ <programlisting>
+ synchronous_standby_names = 'ANY 2 (s1, s2, s3)'
+ </programlisting>
+ In this example, if four standby servers <literal>s1</>, <literal>s2</>,
+ <literal>s3</> and <literal>s4</> are running, transaction commits will
+ wait for replies from at least any two standbys of <literal>s1</>,
+ <literal>s2</> and <literal>s3</>. <literal>s4</> is an asynchronous
+ standby since its name is not in the list.
+ </para>
+ <para>
The synchronous states of standby servers can be viewed using
the <structname>pg_stat_replication</structname> view.
</para>
*** a/doc/src/sgml/monitoring.sgml
--- b/doc/src/sgml/monitoring.sgml
***************
*** 1412,1418 **** SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
<entry><structfield>sync_priority</></entry>
<entry><type>integer</></entry>
<entry>Priority of this standby server for being chosen as the
! synchronous standby</entry>
</row>
<row>
<entry><structfield>sync_state</></entry>
--- 1412,1419 ----
<entry><structfield>sync_priority</></entry>
<entry><type>integer</></entry>
<entry>Priority of this standby server for being chosen as the
! synchronous standby in a priority-based synchronous replication.
! This has no effect in a quorum-based synchronous replication.</entry>
</row>
<row>
<entry><structfield>sync_state</></entry>
***************
*** 1437,1442 **** SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
--- 1438,1449 ----
<literal>sync</>: This standby server is synchronous.
</para>
</listitem>
+ <listitem>
+ <para>
+ <literal>quorum</>: This standby server is considered as a candidate
+ for quorum standbys.
+ </para>
+ </listitem>
</itemizedlist>
</entry>
</row>
*** a/src/backend/replication/Makefile
--- b/src/backend/replication/Makefile
***************
*** 26,32 **** repl_gram.o: repl_scanner.c
# syncrep_scanner is complied as part of syncrep_gram
syncrep_gram.o: syncrep_scanner.c
! syncrep_scanner.c: FLEXFLAGS = -CF -p
syncrep_scanner.c: FLEX_NO_BACKUP=yes
# repl_gram.c, repl_scanner.c, syncrep_gram.c and syncrep_scanner.c
--- 26,32 ----
# syncrep_scanner is complied as part of syncrep_gram
syncrep_gram.o: syncrep_scanner.c
! syncrep_scanner.c: FLEXFLAGS = -CF -p -i
syncrep_scanner.c: FLEX_NO_BACKUP=yes
# repl_gram.c, repl_scanner.c, syncrep_gram.c and syncrep_scanner.c
*** a/src/backend/replication/syncrep.c
--- b/src/backend/replication/syncrep.c
***************
*** 30,52 ****
* searching the through all waiters each time we receive a reply.
*
* In 9.5 or before only a single standby could be considered as
! * synchronous. In 9.6 we support multiple synchronous standbys.
! * The number of synchronous standbys that transactions must wait for
! * replies from is specified in synchronous_standby_names.
! * This parameter also specifies a list of standby names,
! * which determines the priority of each standby for being chosen as
! * a synchronous standby. The standbys whose names appear earlier
! * in the list are given higher priority and will be considered as
! * synchronous. Other standby servers appearing later in this list
! * represent potential synchronous standbys. If any of the current
! * synchronous standbys disconnects for whatever reason, it will be
! * replaced immediately with the next-highest-priority standby.
*
* Before the standbys chosen from synchronous_standby_names can
* become the synchronous standbys they must have caught up with
* the primary; that may take some time. Once caught up,
! * the current higher priority standbys which are considered as
! * synchronous at that moment will release waiters from the queue.
*
* Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group
*
--- 30,63 ----
* searching the through all waiters each time we receive a reply.
*
* In 9.5 or before only a single standby could be considered as
! * synchronous. In 9.6 we support a priority-based multiple synchronous
! * standbys. In 10.0 a quorum-based multiple synchronous standbys is also
! * supported. The number of synchronous standbys that transactions
! * must wait for replies from is specified in synchronous_standby_names.
! * This parameter also specifies a list of standby names and the method
! * (FIRST and ANY) to choose synchronous standbys from the listed ones.
! *
! * The method FIRST specifies a priority-based synchronous replication
! * and makes transaction commits wait until their WAL records are
! * replicated to the requested number of synchronous standbys chosen based
! * on their priorities. The standbys whose names appear earlier in the list
! * are given higher priority and will be considered as synchronous.
! * Other standby servers appearing later in this list represent potential
! * synchronous standbys. If any of the current synchronous standbys
! * disconnects for whatever reason, it will be replaced immediately with
! * the next-highest-priority standby.
! *
! * The method ANY specifies a quorum-based synchronous replication
! * and makes transaction commits wait until their WAL records are
! * replicated to at least the requested number of synchronous standbys
! * in the list. All the standbys appearing in the list are considered as
! * candidates for quorum synchronous standbys.
*
* Before the standbys chosen from synchronous_standby_names can
* become the synchronous standbys they must have caught up with
* the primary; that may take some time. Once caught up,
! * the standbys which are considered as synchronous at that moment
! * will release waiters from the queue.
*
* Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group
*
***************
*** 79,96 **** char *SyncRepStandbyNames;
static bool announce_next_takeover = true;
! static SyncRepConfigData *SyncRepConfig = NULL;
static int SyncRepWaitMode = SYNC_REP_NO_WAIT;
static void SyncRepQueueInsert(int mode);
static void SyncRepCancelWait(void);
static int SyncRepWakeQueue(bool all, int mode);
! static bool SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
! XLogRecPtr *flushPtr,
! XLogRecPtr *applyPtr,
! bool *am_sync);
static int SyncRepGetStandbyPriority(void);
#ifdef USE_ASSERT_CHECKING
static bool SyncRepQueueIsOrderedByLSN(int mode);
--- 90,118 ----
static bool announce_next_takeover = true;
! SyncRepConfigData *SyncRepConfig = NULL;
static int SyncRepWaitMode = SYNC_REP_NO_WAIT;
static void SyncRepQueueInsert(int mode);
static void SyncRepCancelWait(void);
static int SyncRepWakeQueue(bool all, int mode);
! static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
! XLogRecPtr *flushPtr,
! XLogRecPtr *applyPtr,
! bool *am_sync);
! static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
! XLogRecPtr *flushPtr,
! XLogRecPtr *applyPtr,
! List *sync_standbys);
! static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
! XLogRecPtr *flushPtr,
! XLogRecPtr *applyPtr,
! List *sync_standbys, uint8 nth);
static int SyncRepGetStandbyPriority(void);
+ static List *SyncRepGetSyncStandbysPriority(bool *am_sync);
+ static List *SyncRepGetSyncStandbysQuorum(bool *am_sync);
+ static int cmp_lsn(const void *a, const void *b);
#ifdef USE_ASSERT_CHECKING
static bool SyncRepQueueIsOrderedByLSN(int mode);
***************
*** 386,392 **** SyncRepReleaseWaiters(void)
XLogRecPtr writePtr;
XLogRecPtr flushPtr;
XLogRecPtr applyPtr;
! bool got_oldest;
bool am_sync;
int numwrite = 0;
int numflush = 0;
--- 408,414 ----
XLogRecPtr writePtr;
XLogRecPtr flushPtr;
XLogRecPtr applyPtr;
! bool got_recptr;
bool am_sync;
int numwrite = 0;
int numflush = 0;
***************
*** 413,423 **** SyncRepReleaseWaiters(void)
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
/*
! * Check whether we are a sync standby or not, and calculate the oldest
* positions among all sync standbys.
*/
! got_oldest = SyncRepGetOldestSyncRecPtr(&writePtr, &flushPtr,
! &applyPtr, &am_sync);
/*
* If we are managing a sync standby, though we weren't prior to this,
--- 435,444 ----
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
/*
! * Check whether we are a sync standby or not, and calculate the synced
* positions among all sync standbys.
*/
! got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
/*
* If we are managing a sync standby, though we weren't prior to this,
***************
*** 426,441 **** SyncRepReleaseWaiters(void)
if (announce_next_takeover && am_sync)
{
announce_next_takeover = false;
! ereport(LOG,
! (errmsg("standby \"%s\" is now a synchronous standby with priority %u",
! application_name, MyWalSnd->sync_standby_priority)));
}
/*
* If the number of sync standbys is less than requested or we aren't
* managing a sync standby then just leave.
*/
! if (!got_oldest || !am_sync)
{
LWLockRelease(SyncRepLock);
announce_next_takeover = !am_sync;
--- 447,468 ----
if (announce_next_takeover && am_sync)
{
announce_next_takeover = false;
!
! if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
! ereport(LOG,
! (errmsg("standby \"%s\" is now a synchronous standby with priority %u",
! application_name, MyWalSnd->sync_standby_priority)));
! else
! ereport(LOG,
! (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby",
! application_name)));
}
/*
* If the number of sync standbys is less than requested or we aren't
* managing a sync standby then just leave.
*/
! if (!got_recptr || !am_sync)
{
LWLockRelease(SyncRepLock);
announce_next_takeover = !am_sync;
***************
*** 471,491 **** SyncRepReleaseWaiters(void)
}
/*
! * Calculate the oldest Write, Flush and Apply positions among sync standbys.
*
* Return false if the number of sync standbys is less than
* synchronous_standby_names specifies. Otherwise return true and
! * store the oldest positions into *writePtr, *flushPtr and *applyPtr.
*
* On return, *am_sync is set to true if this walsender is connecting to
* sync standby. Otherwise it's set to false.
*/
static bool
! SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
XLogRecPtr *applyPtr, bool *am_sync)
{
List *sync_standbys;
- ListCell *cell;
*writePtr = InvalidXLogRecPtr;
*flushPtr = InvalidXLogRecPtr;
--- 498,517 ----
}
/*
! * Calculate the synced Write, Flush and Apply positions among sync standbys.
*
* Return false if the number of sync standbys is less than
* synchronous_standby_names specifies. Otherwise return true and
! * store the positions into *writePtr, *flushPtr and *applyPtr.
*
* On return, *am_sync is set to true if this walsender is connecting to
* sync standby. Otherwise it's set to false.
*/
static bool
! SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
XLogRecPtr *applyPtr, bool *am_sync)
{
List *sync_standbys;
*writePtr = InvalidXLogRecPtr;
*flushPtr = InvalidXLogRecPtr;
***************
*** 508,519 **** SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
}
/*
! * Scan through all sync standbys and calculate the oldest Write, Flush
! * and Apply positions.
*/
! foreach(cell, sync_standbys)
{
! WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
XLogRecPtr write;
XLogRecPtr flush;
XLogRecPtr apply;
--- 534,582 ----
}
/*
! * In a priority-based sync replication, the synced positions are the
! * oldest ones among sync standbys. In a quorum-based, they are the Nth
! * latest ones.
! *
! * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest positions.
! * But we use SyncRepGetOldestSyncRecPtr() for that calculation because
! * it's a bit more efficient.
! *
! * XXX If the numbers of current and requested sync standbys are the same,
! * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
! * positions even in a quorum-based sync replication.
! */
! if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
! {
! SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
! sync_standbys);
! }
! else
! {
! SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
! sync_standbys, SyncRepConfig->num_sync);
! }
!
! list_free(sync_standbys);
! return true;
! }
!
! /*
! * Calculate the oldest Write, Flush and Apply positions among sync standbys.
! */
! static void
! SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
! XLogRecPtr *applyPtr, List *sync_standbys)
! {
! ListCell *cell;
!
! /*
! * Scan through all sync standbys and calculate the oldest
! * Write, Flush and Apply positions.
*/
! foreach (cell, sync_standbys)
{
! WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
XLogRecPtr write;
XLogRecPtr flush;
XLogRecPtr apply;
***************
*** 531,553 **** SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
*applyPtr = apply;
}
! list_free(sync_standbys);
! return true;
}
/*
* Return the list of sync standbys, or NIL if no sync standby is connected.
*
- * If there are multiple standbys with the same priority,
- * the first one found is selected preferentially.
* The caller must hold SyncRepLock.
*
* On return, *am_sync is set to true if this walsender is connecting to
* sync standby. Otherwise it's set to false.
*/
List *
! SyncRepGetSyncStandbys(bool *am_sync)
{
List *result = NIL;
List *pending = NIL;
--- 594,756 ----
if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
*applyPtr = apply;
}
+ }
! /*
! * Calculate the Nth latest Write, Flush and Apply positions among sync
! * standbys.
! */
! static void
! SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
! XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth)
! {
! ListCell *cell;
! XLogRecPtr *write_array;
! XLogRecPtr *flush_array;
! XLogRecPtr *apply_array;
! int len;
! int i = 0;
!
! len = list_length(sync_standbys);
! write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
! flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
! apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
!
! foreach (cell, sync_standbys)
! {
! WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
!
! SpinLockAcquire(&walsnd->mutex);
! write_array[i] = walsnd->write;
! flush_array[i] = walsnd->flush;
! apply_array[i] = walsnd->apply;
! SpinLockRelease(&walsnd->mutex);
!
! i++;
! }
!
! qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn);
! qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn);
! qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn);
!
! /* Get Nth latest Write, Flush, Apply positions */
! *writePtr = write_array[nth - 1];
! *flushPtr = flush_array[nth - 1];
! *applyPtr = apply_array[nth - 1];
!
! pfree(write_array);
! pfree(flush_array);
! pfree(apply_array);
! }
!
! /*
! * Compare lsn in order to sort array in descending order.
! */
! static int
! cmp_lsn(const void *a, const void *b)
! {
! XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
! XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
!
! if (lsn1 > lsn2)
! return -1;
! else if (lsn1 == lsn2)
! return 0;
! else
! return 1;
}
/*
* Return the list of sync standbys, or NIL if no sync standby is connected.
*
* The caller must hold SyncRepLock.
*
* On return, *am_sync is set to true if this walsender is connecting to
* sync standby. Otherwise it's set to false.
*/
List *
! SyncRepGetSyncStandbys(bool *am_sync)
! {
! /* Set default result */
! if (am_sync != NULL)
! *am_sync = false;
!
! /* Quick exit if sync replication is not requested */
! if (SyncRepConfig == NULL)
! return NIL;
!
! return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ?
! SyncRepGetSyncStandbysPriority(am_sync) :
! SyncRepGetSyncStandbysQuorum(am_sync);
! }
!
! /*
! * Return the list of all the candidates for quorum sync standbys,
! * or NIL if no such standby is connected.
! *
! * The caller must hold SyncRepLock. This function must be called only in
! * a quorum-based sync replication.
! *
! * On return, *am_sync is set to true if this walsender is connecting to
! * sync standby. Otherwise it's set to false.
! */
! static List *
! SyncRepGetSyncStandbysQuorum(bool *am_sync)
! {
! List *result = NIL;
! int i;
! volatile WalSnd *walsnd; /* Use volatile pointer to prevent code
! * rearrangement */
!
! Assert(SyncRepConfig->syncrep_method == SYNC_REP_QUORUM);
!
! for (i = 0; i < max_wal_senders; i++)
! {
! walsnd = &WalSndCtl->walsnds[i];
!
! /* Must be active */
! if (walsnd->pid == 0)
! continue;
!
! /* Must be streaming */
! if (walsnd->state != WALSNDSTATE_STREAMING)
! continue;
!
! /* Must be synchronous */
! if (walsnd->sync_standby_priority == 0)
! continue;
!
! /* Must have a valid flush position */
! if (XLogRecPtrIsInvalid(walsnd->flush))
! continue;
!
! /*
! * Consider this standby as a candidate for quorum sync standbys
! * and append it to the result.
! */
! result = lappend_int(result, i);
! if (am_sync != NULL && walsnd == MyWalSnd)
! *am_sync = true;
! }
!
! return result;
! }
!
! /*
! * Return the list of sync standbys chosen based on their priorities,
! * or NIL if no sync standby is connected.
! *
! * If there are multiple standbys with the same priority,
! * the first one found is selected preferentially.
! *
! * The caller must hold SyncRepLock. This function must be called only in
! * a priority-based sync replication.
! *
! * On return, *am_sync is set to true if this walsender is connecting to
! * sync standby. Otherwise it's set to false.
! */
! static List *
! SyncRepGetSyncStandbysPriority(bool *am_sync)
{
List *result = NIL;
List *pending = NIL;
***************
*** 560,572 **** SyncRepGetSyncStandbys(bool *am_sync)
volatile WalSnd *walsnd; /* Use volatile pointer to prevent code
* rearrangement */
! /* Set default result */
! if (am_sync != NULL)
! *am_sync = false;
!
! /* Quick exit if sync replication is not requested */
! if (SyncRepConfig == NULL)
! return NIL;
lowest_priority = SyncRepConfig->nmembers;
next_highest_priority = lowest_priority + 1;
--- 763,769 ----
volatile WalSnd *walsnd; /* Use volatile pointer to prevent code
* rearrangement */
! Assert(SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY);
lowest_priority = SyncRepConfig->nmembers;
next_highest_priority = lowest_priority + 1;
*** a/src/backend/replication/syncrep_gram.y
--- b/src/backend/replication/syncrep_gram.y
***************
*** 21,27 **** SyncRepConfigData *syncrep_parse_result;
char *syncrep_parse_error_msg;
static SyncRepConfigData *create_syncrep_config(const char *num_sync,
! List *members);
/*
* Bison doesn't allocate anything that needs to live across parser calls,
--- 21,27 ----
char *syncrep_parse_error_msg;
static SyncRepConfigData *create_syncrep_config(const char *num_sync,
! List *members, uint8 syncrep_method);
/*
* Bison doesn't allocate anything that needs to live across parser calls,
***************
*** 46,52 **** static SyncRepConfigData *create_syncrep_config(const char *num_sync,
SyncRepConfigData *config;
}
! %token <str> NAME NUM JUNK
%type <config> result standby_config
%type <list> standby_list
--- 46,52 ----
SyncRepConfigData *config;
}
! %token <str> NAME NUM JUNK ANY FIRST
%type <config> result standby_config
%type <list> standby_list
***************
*** 60,67 **** result:
;
standby_config:
! standby_list { $$ = create_syncrep_config("1", $1); }
! | NUM '(' standby_list ')' { $$ = create_syncrep_config($1, $3); }
;
standby_list:
--- 60,69 ----
;
standby_config:
! standby_list { $$ = create_syncrep_config("1", $1, SYNC_REP_PRIORITY); }
! | NUM '(' standby_list ')' { $$ = create_syncrep_config($1, $3, SYNC_REP_PRIORITY); }
! | ANY NUM '(' standby_list ')' { $$ = create_syncrep_config($2, $4, SYNC_REP_QUORUM); }
! | FIRST NUM '(' standby_list ')' { $$ = create_syncrep_config($2, $4, SYNC_REP_PRIORITY); }
;
standby_list:
***************
*** 75,83 **** standby_name:
;
%%
-
static SyncRepConfigData *
! create_syncrep_config(const char *num_sync, List *members)
{
SyncRepConfigData *config;
int size;
--- 77,84 ----
;
%%
static SyncRepConfigData *
! create_syncrep_config(const char *num_sync, List *members, uint8 syncrep_method)
{
SyncRepConfigData *config;
int size;
***************
*** 98,103 **** create_syncrep_config(const char *num_sync, List *members)
--- 99,105 ----
config->config_size = size;
config->num_sync = atoi(num_sync);
+ config->syncrep_method = syncrep_method;
config->nmembers = list_length(members);
ptr = config->member_names;
foreach(lc, members)
*** a/src/backend/replication/syncrep_scanner.l
--- b/src/backend/replication/syncrep_scanner.l
***************
*** 64,69 **** xdinside [^"]+
--- 64,72 ----
%%
{space}+ { /* ignore */ }
+ ANY { return ANY; }
+ FIRST { return FIRST; }
+
{xdstart} {
initStringInfo(&xdbuf);
BEGIN(xd);
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 2868,2879 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
/*
* More easily understood version of standby state. This is purely
! * informational, not different from priority.
*/
if (priority == 0)
values[7] = CStringGetTextDatum("async");
else if (list_member_int(sync_standbys, i))
! values[7] = CStringGetTextDatum("sync");
else
values[7] = CStringGetTextDatum("potential");
}
--- 2868,2887 ----
/*
* More easily understood version of standby state. This is purely
! * informational.
! *
! * In quorum-based sync replication, the role of each standby
! * listed in synchronous_standby_names can be changing very
! * frequently. Any standbys considered as "sync" at one moment can
! * be switched to "potential" ones at the next moment. So, it's
! * basically useless to report "sync" or "potential" as their sync
! * states. We report just "quorum" for them.
*/
if (priority == 0)
values[7] = CStringGetTextDatum("async");
else if (list_member_int(sync_standbys, i))
! values[7] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ?
! CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
else
values[7] = CStringGetTextDatum("potential");
}
*** a/src/backend/utils/misc/postgresql.conf.sample
--- b/src/backend/utils/misc/postgresql.conf.sample
***************
*** 245,251 ****
# These settings are ignored on a standby server.
#synchronous_standby_names = '' # standby servers that provide sync rep
! # number of sync standbys and comma-separated list of application_name
# from standby(s); '*' = all
#vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed
--- 245,252 ----
# These settings are ignored on a standby server.
#synchronous_standby_names = '' # standby servers that provide sync rep
! # method to choose sync standbys, number of sync standbys
! # and comma-separated list of application_name
# from standby(s); '*' = all
#vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed
*** a/src/include/replication/syncrep.h
--- b/src/include/replication/syncrep.h
***************
*** 32,37 ****
--- 32,41 ----
#define SYNC_REP_WAITING 1
#define SYNC_REP_WAIT_COMPLETE 2
+ /* syncrep_method of SyncRepConfigData */
+ #define SYNC_REP_PRIORITY 0
+ #define SYNC_REP_QUORUM 1
+
/*
* Struct for the configuration of synchronous replication.
*
***************
*** 44,54 **** typedef struct SyncRepConfigData
--- 48,61 ----
int config_size; /* total size of this struct, in bytes */
int num_sync; /* number of sync standbys that we need to
* wait for */
+ uint8 syncrep_method; /* method to choose sync standbys */
int nmembers; /* number of members in the following list */
/* member_names contains nmembers consecutive nul-terminated C strings */
char member_names[FLEXIBLE_ARRAY_MEMBER];
} SyncRepConfigData;
+ extern SyncRepConfigData *SyncRepConfig;
+
/* communication variables for parsing synchronous_standby_names GUC */
extern SyncRepConfigData *syncrep_parse_result;
extern char *syncrep_parse_error_msg;
*** a/src/test/recovery/t/007_sync_rep.pl
--- b/src/test/recovery/t/007_sync_rep.pl
***************
*** 3,9 **** use strict;
use warnings;
use PostgresNode;
use TestLib;
! use Test::More tests => 8;
# Query checking sync_priority and sync_state of each standby
my $check_sql =
--- 3,9 ----
use warnings;
use PostgresNode;
use TestLib;
! use Test::More tests => 11;
# Query checking sync_priority and sync_state of each standby
my $check_sql =
***************
*** 172,174 **** test_sync_state(
--- 172,205 ----
standby2|1|sync
standby4|1|potential),
'potential standby found earlier in array is promoted to sync');
+
+ # Check that standby1 and standby2 are chosen as sync standbys
+ # based on their priorities.
+ test_sync_state(
+ $node_master, qq(standby1|1|sync
+ standby2|2|sync
+ standby4|0|async),
+ 'priority-based sync replication specified by FIRST keyword',
+ 'FIRST 2(standby1, standby2)');
+
+ # Check that all the listed standbys are considered as candidates
+ # for sync standbys in a quorum-based sync replication.
+ test_sync_state(
+ $node_master, qq(standby1|1|quorum
+ standby2|2|quorum
+ standby4|0|async),
+ '2 quorum and 1 async',
+ 'ANY 2(standby1, standby2)');
+
+ # Start Standby3 which will be considered in 'quorum' state.
+ $node_standby_3->start;
+
+ # Check that the setting of 'ANY 2(*)' chooses all standbys as
+ # candidates for quorum sync standbys.
+ test_sync_state(
+ $node_master, qq(standby1|1|quorum
+ standby2|1|quorum
+ standby3|1|quorum
+ standby4|1|quorum),
+ 'all standbys are considered as candidates for quorum sync standbys',
+ 'ANY 2(*)');
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers