On Thu, Dec 8, 2016 at 4:39 PM, Michael Paquier
<michael.paqu...@gmail.com> wrote:
> On Thu, Dec 8, 2016 at 9:07 AM, Robert Haas <robertmh...@gmail.com> wrote:
>> You could do that, but first I would code up the simplest, cleanest
>> algorithm you can think of and see if it even shows up in a 'perf'
>> profile.  Microbenchmarking is probably overkill here unless a problem
>> is visible on macrobenchmarks.
>
> This is what I would go for! The current code is doing a simple thing:
> select the Nth element using qsort() after scanning each WAL sender's
> values. And I think that Sawada-san got it right. Even running on my
> laptop a pgbench run with 10 sync standbys using a data set that fits
> into memory, SyncRepGetOldestSyncRecPtr gets at most 0.04% of overhead
> using perf top on a non-assert, non-debug build. Hash tables and
> allocations get a far larger share. Using the patch,
> SyncRepGetSyncRecPtr is at the same level with a quorum set of 10
> nodes. Let's kick the ball for now. An extra patch could make things
> better later on if that's worth it.

Yeah, since the both K and N could be not large these algorithm takes
almost the same time. And current patch does simple thing. When we
need over 100 or 1000 replication node the optimization could be
required.
Attached latest v9 patch.

Regards,

--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 0fc4e57..bc67a99 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -3054,42 +3054,75 @@ include_dir 'conf.d'
         transactions waiting for commit will be allowed to proceed after
         these standby servers confirm receipt of their data.
         The synchronous standbys will be those whose names appear
-        earlier in this list, and
+        in this list, and
         that are both currently connected and streaming data in real-time
         (as shown by a state of <literal>streaming</literal> in the
         <link linkend="monitoring-stats-views-table">
-        <literal>pg_stat_replication</></link> view).
-        Other standby servers appearing later in this list represent potential
-        synchronous standbys. If any of the current synchronous
-        standbys disconnects for whatever reason,
-        it will be replaced immediately with the next-highest-priority standby.
-        Specifying more than one standby name can allow very high availability.
+        <literal>pg_stat_replication</></link> view). If the keyword
+        <literal>FIRST</> is specified, other standby servers appearing
+        later in this list represent potential synchronous standbys.
+        If any of the current synchronous standbys disconnects for
+        whatever reason, it will be replaced immediately with the
+        next-highest-priority standby. Specifying more than one standby
+        name can allow very high availability.
        </para>
        <para>
         This parameter specifies a list of standby servers using
         either of the following syntaxes:
 <synopsis>
-<replaceable class="parameter">num_sync</replaceable> ( <replaceable class="parameter">standby_name</replaceable> [, ...] )
+[ANY] <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="parameter">standby_name</replaceable> [, ...] )
+FIRST <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="parameter">standby_name</replaceable> [, ...] )
 <replaceable class="parameter">standby_name</replaceable> [, ...]
 </synopsis>
         where <replaceable class="parameter">num_sync</replaceable> is
         the number of synchronous standbys that transactions need to
         wait for replies from,
         and <replaceable class="parameter">standby_name</replaceable>
-        is the name of a standby server. For example, a setting of
-        <literal>3 (s1, s2, s3, s4)</> makes transaction commits wait
-        until their WAL records are received by three higher-priority standbys
-        chosen from standby servers <literal>s1</>, <literal>s2</>,
-        <literal>s3</> and <literal>s4</>.
+        is the name of a standby server.
+        <literal>FIRST</> and <literal>ANY</> specify the method used by
+        the master to control the standby servres.
         </para>
         <para>
-        The second syntax was used before <productname>PostgreSQL</>
+        The keyword <literal>FIRST</>, coupled with <literal>num_sync</>, makes
+        transaction commit wait until WAL records are received from the
+        <literal>num_sync</> standbys with higher priority number.
+        For example, a setting of <literal>FIRST 3 (s1, s2, s3, s4)</>
+        makes transaction commits wait until their WAL records are received
+        by three higher-priority standbys chosen from standby servers
+        <literal>s1</>, <literal>s2</>, <literal>s3</> and <literal>s4</>.
+        </para>
+        <para>
+        The keyword <literal>ANY</>, coupled with <literal>num_sync</>,
+        makes transaction commits wait until WAL records are received
+        from at least <literal>num_sync</> connected standbys among those
+        defined in the list of <varname>synchronous_standby_names</>. For
+        example, a setting of <literal>ANY 3 (s1, s2, s3, s4)</> makes
+        transaction commits wait until receiving WAL records from at least
+        any three standbys of four listed servers <literal>s1</>,
+        <literal>s2</>, <literal>s3</>, <literal>s4</>.
+        </para>
+        <para>
+        <literal>FIRST</> and <literal>ANY</> are case-insensitive words
+        and the standby name having these words are must be double-quoted.
+        </para>
+        <para>
+        The third syntax was used before <productname>PostgreSQL</>
         version 9.6 and is still supported. It's the same as the first syntax
-        with <replaceable class="parameter">num_sync</replaceable> equal to 1.
-        For example, <literal>1 (s1, s2)</> and
-        <literal>s1, s2</> have the same meaning: either <literal>s1</>
-        or <literal>s2</> is chosen as a synchronous standby.
-       </para>
+        with <literal>FIRST</> and <replaceable class="parameter">num_sync</replaceable>
+        equal to 1. For example, <literal>FIRST 1 (s1, s2)</> and <literal>s1, s2</>
+        have the same meaning: either <literal>s1</> or <literal>s2</> is
+        chosen as a synchronous standby.
+        </para>
+        <note>
+         <para>
+         If <literal>FIRST</> or <literal>ANY</> are not specified, this parameter
+         behaves as if <literal>ANY</> is used. Note that this grammar is incompatible
+         with <productname>PostgresSQL</> 9.6 which is first version supporting multiple
+         standbys with synchronous replication, where no such keyword <literal>FIRST</>
+         or <literal>ANY</> can be used. Note that the grammer behaves as if <literal>FIRST</>
+         is used, which is incompatible with the post-9.6 version behavior.
+        </para>
+       </note>
        <para>
         The name of a standby server for this purpose is the
         <varname>application_name</> setting of the standby, as set in the
diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml
index 6b89507..26e3c4e 100644
--- a/doc/src/sgml/high-availability.sgml
+++ b/doc/src/sgml/high-availability.sgml
@@ -1150,7 +1150,7 @@ primary_slot_name = 'node_a_slot'
     An example of <varname>synchronous_standby_names</> for multiple
     synchronous standbys is:
 <programlisting>
-synchronous_standby_names = '2 (s1, s2, s3)'
+synchronous_standby_names = 'FIRST 2 (s1, s2, s3)'
 </programlisting>
     In this example, if four standby servers <literal>s1</>, <literal>s2</>,
     <literal>s3</> and <literal>s4</> are running, the two standbys
@@ -1165,6 +1165,18 @@ synchronous_standby_names = '2 (s1, s2, s3)'
     The synchronous states of standby servers can be viewed using
     the <structname>pg_stat_replication</structname> view.
    </para>
+   <para>
+    Another example of <varname>synchronous_standby_names</> for multiple
+    synchronous standby is:
+<programlisting>
+ synchronous_standby_names = 'ANY 2 (s1, s2, s3)'
+</programlisting>
+    In this example, if four standby servers <literal>s1</>, <literal>s2</>,
+    <literal>s3</> and <literal>s4</> are running, the three standbys <literal>s1</>,
+    <literal>s2</> and <literal>s3</> will be considered as synchronous standby
+    candidates. The master server will wait for at least 2 replies from them.
+    <literal>s4</> is an asynchronous standby since its name is not in the list.
+   </para>
    </sect3>
 
    <sect3 id="synchronous-replication-performance">
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 128ee13..771787d 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1437,6 +1437,11 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
            <literal>sync</>: This standby server is synchronous.
           </para>
          </listitem>
+         <listitem>
+         <para>
+          <literal>quorum</>: This standby is considered as a candidate of quorum commit.
+         </para>
+         </listitem>
        </itemizedlist>
      </entry>
     </row>
diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile
index c99717e..da8bcf0 100644
--- a/src/backend/replication/Makefile
+++ b/src/backend/replication/Makefile
@@ -26,7 +26,7 @@ repl_gram.o: repl_scanner.c
 
 # syncrep_scanner is complied as part of syncrep_gram
 syncrep_gram.o: syncrep_scanner.c
-syncrep_scanner.c: FLEXFLAGS = -CF -p
+syncrep_scanner.c: FLEXFLAGS = -CF -p -i
 syncrep_scanner.c: FLEX_NO_BACKUP=yes
 
 # repl_gram.c, repl_scanner.c, syncrep_gram.c and syncrep_scanner.c
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index ac29f56..bcc1317 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -31,16 +31,19 @@
  *
  * In 9.5 or before only a single standby could be considered as
  * synchronous. In 9.6 we support multiple synchronous standbys.
- * The number of synchronous standbys that transactions must wait for
- * replies from is specified in synchronous_standby_names.
- * This parameter also specifies a list of standby names,
- * which determines the priority of each standby for being chosen as
- * a synchronous standby. The standbys whose names appear earlier
- * in the list are given higher priority and will be considered as
- * synchronous. Other standby servers appearing later in this list
- * represent potential synchronous standbys. If any of the current
- * synchronous standbys disconnects for whatever reason, it will be
- * replaced immediately with the next-highest-priority standby.
+ * In 10.0 we support two synchronization methods, priority and
+ * quorum. The number of synchronous standbys that transactions
+ * must wait for replies from and synchronization method are specified
+ * in synchronous_standby_names. This parameter also specifies a list
+ * of standby names, which determines the priority of each standby for
+ * being chosen as a synchronous standby. In priority method, the standbys
+ * whose names appear earlier in the list are given higher priority
+ * and will be considered as synchronous. Other standby servers appearing
+ * later in this list represent potential synchronous standbys. If any of
+ * the current synchronous standbys disconnects for whatever reason,
+ * it will be replaced immediately with the next-highest-priority standby.
+ * In quorum method, the all standbys appearing in the list are
+ * considered as a candidate for quorum commit.
  *
  * Before the standbys chosen from synchronous_standby_names can
  * become the synchronous standbys they must have caught up with
@@ -73,24 +76,27 @@
 
 /* User-settable parameters for sync rep */
 char	   *SyncRepStandbyNames;
+SyncRepConfigData *SyncRepConfig = NULL;
 
 #define SyncStandbysDefined() \
 	(SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
 
 static bool announce_next_takeover = true;
 
-static SyncRepConfigData *SyncRepConfig = NULL;
 static int	SyncRepWaitMode = SYNC_REP_NO_WAIT;
 
 static void SyncRepQueueInsert(int mode);
 static void SyncRepCancelWait(void);
 static int	SyncRepWakeQueue(bool all, int mode);
 
-static bool SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
-						   XLogRecPtr *flushPtr,
-						   XLogRecPtr *applyPtr,
-						   bool *am_sync);
+static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
+								 XLogRecPtr *flushPtr,
+								 XLogRecPtr *applyPtr,
+								 bool *am_sync);
 static int	SyncRepGetStandbyPriority(void);
+static List *SyncRepGetSyncStandbysPriority(bool *am_sync);
+static List *SyncRepGetSyncStandbysQuorum(bool *am_sync);
+static int	cmp_lsn(const void *a, const void *b);
 
 #ifdef USE_ASSERT_CHECKING
 static bool SyncRepQueueIsOrderedByLSN(int mode);
@@ -386,7 +392,7 @@ SyncRepReleaseWaiters(void)
 	XLogRecPtr	writePtr;
 	XLogRecPtr	flushPtr;
 	XLogRecPtr	applyPtr;
-	bool		got_oldest;
+	bool		got_recptr;
 	bool		am_sync;
 	int			numwrite = 0;
 	int			numflush = 0;
@@ -413,11 +419,10 @@ SyncRepReleaseWaiters(void)
 	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
 
 	/*
-	 * Check whether we are a sync standby or not, and calculate the oldest
-	 * positions among all sync standbys.
+	 * Check whether we are a sync standby or not, and calculate the synced
+	 * positions among all sync standbys using method.
 	 */
-	got_oldest = SyncRepGetOldestSyncRecPtr(&writePtr, &flushPtr,
-											&applyPtr, &am_sync);
+	got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
 
 	/*
 	 * If we are managing a sync standby, though we weren't prior to this,
@@ -435,7 +440,7 @@ SyncRepReleaseWaiters(void)
 	 * If the number of sync standbys is less than requested or we aren't
 	 * managing a sync standby then just leave.
 	 */
-	if (!got_oldest || !am_sync)
+	if (!got_recptr || !am_sync)
 	{
 		LWLockRelease(SyncRepLock);
 		announce_next_takeover = !am_sync;
@@ -471,17 +476,50 @@ SyncRepReleaseWaiters(void)
 }
 
 /*
- * Calculate the oldest Write, Flush and Apply positions among sync standbys.
+ * Return the list of sync standbys using according to synchronous method,
+ * or NIL if no sync standby is connected. The caller must hold SyncRepLock.
+ *
+ * On return, *am_sync is set to true if this walsender is connecting to
+ * sync standby. Otherwise it's set to false.
+ */
+List *
+SyncRepGetSyncStandbys(bool	*am_sync)
+{
+	/* Set default result */
+	if (am_sync != NULL)
+		*am_sync = false;
+
+	/* Quick exit if sync replication is not requested */
+	if (SyncRepConfig == NULL)
+		return NIL;
+
+	if (SyncRepConfig->sync_method == SYNC_REP_PRIORITY)
+		return SyncRepGetSyncStandbysPriority(am_sync);
+	else if (SyncRepConfig->sync_method == SYNC_REP_QUORUM)
+		return SyncRepGetSyncStandbysQuorum(am_sync);
+	else
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				"invalid synchronization method is specified \"%d\"",
+				 SyncRepConfig->sync_method));
+}
+
+/*
+ * Calculate the Write, Flush and Apply positions among sync standbys.
  *
  * Return false if the number of sync standbys is less than
  * synchronous_standby_names specifies. Otherwise return true and
- * store the oldest positions into *writePtr, *flushPtr and *applyPtr.
+ * store the positions into *writePtr, *flushPtr and *applyPtr.
+ *
+ * In priority method, we need the oldest these positions among sync
+ * standbys. In quorum method, we need the newest these positions
+ * specified by SyncRepConfig->num_sync.
  *
  * On return, *am_sync is set to true if this walsender is connecting to
  * sync standby. Otherwise it's set to false.
  */
 static bool
-SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
+SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
 						   XLogRecPtr *applyPtr, bool *am_sync)
 {
 	List	   *sync_standbys;
@@ -507,29 +545,74 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
 		return false;
 	}
 
-	/*
-	 * Scan through all sync standbys and calculate the oldest Write, Flush
-	 * and Apply positions.
-	 */
-	foreach(cell, sync_standbys)
+	if (SyncRepConfig->sync_method == SYNC_REP_PRIORITY)
 	{
-		WalSnd	   *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
-		XLogRecPtr	write;
-		XLogRecPtr	flush;
-		XLogRecPtr	apply;
-
-		SpinLockAcquire(&walsnd->mutex);
-		write = walsnd->write;
-		flush = walsnd->flush;
-		apply = walsnd->apply;
-		SpinLockRelease(&walsnd->mutex);
-
-		if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
-			*writePtr = write;
-		if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush)
-			*flushPtr = flush;
-		if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
-			*applyPtr = apply;
+		/*
+		 * Scan through all sync standbys and calculate the oldest
+		 * Write, Flush and Apply positions.
+		 */
+		foreach (cell, sync_standbys)
+		{
+			WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
+			XLogRecPtr	write;
+			XLogRecPtr	flush;
+			XLogRecPtr	apply;
+
+			SpinLockAcquire(&walsnd->mutex);
+			write = walsnd->write;
+			flush = walsnd->flush;
+			apply = walsnd->apply;
+			SpinLockRelease(&walsnd->mutex);
+
+			if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
+				*writePtr = write;
+			if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush)
+				*flushPtr = flush;
+			if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
+				*applyPtr = apply;
+		}
+	}
+	else /* SYNC_REP_QUORUM */
+	{
+		XLogRecPtr	*write_array;
+		XLogRecPtr	*flush_array;
+		XLogRecPtr	*apply_array;
+		int len;
+		int i = 0;
+
+		len = list_length(sync_standbys);
+		write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
+		flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
+		apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
+
+		foreach (cell, sync_standbys)
+		{
+			WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
+
+			SpinLockAcquire(&walsnd->mutex);
+			write_array[i] = walsnd->write;
+			flush_array[i] = walsnd->flush;
+			apply_array[i] = walsnd->apply;
+			SpinLockRelease(&walsnd->mutex);
+
+			i++;
+		}
+
+		qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn);
+		qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn);
+		qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn);
+
+		/*
+		 * Get N-th newest Write, Flush, Apply positions
+		 * specified by SyncRepConfig->num_sync.
+		 */
+		*writePtr = write_array[SyncRepConfig->num_sync - 1];
+		*flushPtr = flush_array[SyncRepConfig->num_sync - 1];
+		*applyPtr = apply_array[SyncRepConfig->num_sync - 1];
+
+		pfree(write_array);
+		pfree(flush_array);
+		pfree(apply_array);
 	}
 
 	list_free(sync_standbys);
@@ -537,17 +620,66 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
 }
 
 /*
- * Return the list of sync standbys, or NIL if no sync standby is connected.
+ * Return the list of sync standbys using quorum method, or
+ * NIL if no sync standby is connected. In quorum method, all standby
+ * priorities are same, that is 1. So this function returns the list of
+ * standbys except for the standbys which are not active, or connected
+ * as async.
  *
- * If there are multiple standbys with the same priority,
+ * On return, *am_sync is set to true if this walsender is connecting to
+ * sync standby. Otherwise it's set to false.
+ */
+static List *
+SyncRepGetSyncStandbysQuorum(bool *am_sync)
+{
+	List	*result = NIL;
+	int i;
+
+	Assert(SyncRepConfig->sync_method == SYNC_REP_QUORUM);
+
+	for (i = 0; i < max_wal_senders; i++)
+	{
+		volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+
+		/* Must be active */
+		if (walsnd->pid == 0)
+			continue;
+
+		/* Must be streaming */
+		if (walsnd->state != WALSNDSTATE_STREAMING)
+			continue;
+
+		/* Must be synchronous */
+		if (walsnd->sync_standby_priority == 0)
+			continue;
+
+		/* Must have a valid flush position */
+		if (XLogRecPtrIsInvalid(walsnd->flush))
+			continue;
+
+		/*
+		 * Consider this standby as candidate of sync and append
+		 * it to the result.
+		 */
+		result = lappend_int(result, i);
+		if (am_sync != NULL && walsnd == MyWalSnd)
+			*am_sync = true;
+	}
+
+	return result;
+}
+
+/*
+ * Return the list of sync standbys using priority method, or
+ * NIL if no sync standby is connected. In priority method,
+ * if there are multiple standbys with the same priority,
  * the first one found is selected preferentially.
- * The caller must hold SyncRepLock.
  *
  * On return, *am_sync is set to true if this walsender is connecting to
  * sync standby. Otherwise it's set to false.
  */
-List *
-SyncRepGetSyncStandbys(bool *am_sync)
+static List *
+SyncRepGetSyncStandbysPriority(bool *am_sync)
 {
 	List	   *result = NIL;
 	List	   *pending = NIL;
@@ -560,13 +692,7 @@ SyncRepGetSyncStandbys(bool *am_sync)
 	volatile WalSnd *walsnd;	/* Use volatile pointer to prevent code
 								 * rearrangement */
 
-	/* Set default result */
-	if (am_sync != NULL)
-		*am_sync = false;
-
-	/* Quick exit if sync replication is not requested */
-	if (SyncRepConfig == NULL)
-		return NIL;
+	Assert(SyncRepConfig->sync_method == SYNC_REP_PRIORITY);
 
 	lowest_priority = SyncRepConfig->nmembers;
 	next_highest_priority = lowest_priority + 1;
@@ -892,6 +1018,23 @@ SyncRepQueueIsOrderedByLSN(int mode)
 #endif
 
 /*
+ * Compare lsn in order to sort array in descending order.
+ */
+static int
+cmp_lsn(const void *a, const void *b)
+{
+	XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
+	XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
+
+	if (lsn1 > lsn2)
+		return -1;
+	else if (lsn1 == lsn2)
+		return 0;
+	else
+		return 1;
+}
+
+/*
  * ===========================================================
  * Synchronous Replication functions executed by any process
  * ===========================================================
diff --git a/src/backend/replication/syncrep_gram.y b/src/backend/replication/syncrep_gram.y
index 35c2776..e10be8b 100644
--- a/src/backend/replication/syncrep_gram.y
+++ b/src/backend/replication/syncrep_gram.y
@@ -21,7 +21,7 @@ SyncRepConfigData *syncrep_parse_result;
 char	   *syncrep_parse_error_msg;
 
 static SyncRepConfigData *create_syncrep_config(const char *num_sync,
-					  List *members);
+					List *members, int sync_method);
 
 /*
  * Bison doesn't allocate anything that needs to live across parser calls,
@@ -46,7 +46,7 @@ static SyncRepConfigData *create_syncrep_config(const char *num_sync,
 	SyncRepConfigData *config;
 }
 
-%token <str> NAME NUM JUNK
+%token <str> NAME NUM JUNK ANY FIRST
 
 %type <config> result standby_config
 %type <list> standby_list
@@ -60,8 +60,10 @@ result:
 	;
 
 standby_config:
-		standby_list				{ $$ = create_syncrep_config("1", $1); }
-		| NUM '(' standby_list ')'	{ $$ = create_syncrep_config($1, $3); }
+		standby_list						{ $$ = create_syncrep_config("1", $1, SYNC_REP_PRIORITY); }
+		| NUM '(' standby_list ')'			{ $$ = create_syncrep_config($1, $3, SYNC_REP_QUORUM); }
+		| ANY NUM '(' standby_list ')'		{ $$ = create_syncrep_config($2, $4, SYNC_REP_QUORUM); }
+		| FIRST NUM '(' standby_list ')'	{ $$ = create_syncrep_config($2, $4, SYNC_REP_PRIORITY); }
 	;
 
 standby_list:
@@ -77,7 +79,7 @@ standby_name:
 
 
 static SyncRepConfigData *
-create_syncrep_config(const char *num_sync, List *members)
+create_syncrep_config(const char *num_sync, List *members, int sync_method)
 {
 	SyncRepConfigData *config;
 	int			size;
@@ -98,6 +100,7 @@ create_syncrep_config(const char *num_sync, List *members)
 
 	config->config_size = size;
 	config->num_sync = atoi(num_sync);
+	config->sync_method = sync_method;
 	config->nmembers = list_length(members);
 	ptr = config->member_names;
 	foreach(lc, members)
diff --git a/src/backend/replication/syncrep_scanner.l b/src/backend/replication/syncrep_scanner.l
index d20662e..403fd7d 100644
--- a/src/backend/replication/syncrep_scanner.l
+++ b/src/backend/replication/syncrep_scanner.l
@@ -54,6 +54,8 @@ digit			[0-9]
 ident_start		[A-Za-z\200-\377_]
 ident_cont		[A-Za-z\200-\377_0-9\$]
 identifier		{ident_start}{ident_cont}*
+any_ident		any
+first_ident		first
 
 dquote			\"
 xdstart			{dquote}
@@ -64,6 +66,14 @@ xdinside		[^"]+
 %%
 {space}+	{ /* ignore */ }
 
+{any_ident}	{
+				yylval.str = pstrdup(yytext);
+				return ANY;
+		}
+{first_ident}	{
+				yylval.str = pstrdup(yytext);
+				return FIRST;
+		}
 {xdstart}	{
 				initStringInfo(&xdbuf);
 				BEGIN(xd);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index aa42d59..28c3eba 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2862,12 +2862,14 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 
 			/*
 			 * More easily understood version of standby state. This is purely
-			 * informational, not different from priority.
+			 * informational. In quorum method, since all standbys are considered as
+			 * a candidate of quorum commit standby state is  always 'quorum'.
 			 */
 			if (priority == 0)
 				values[7] = CStringGetTextDatum("async");
 			else if (list_member_int(sync_standbys, i))
-				values[7] = CStringGetTextDatum("sync");
+				values[7] = SyncRepConfig->sync_method == SYNC_REP_PRIORITY ?
+					CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
 			else
 				values[7] = CStringGetTextDatum("potential");
 		}
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index 46eecbf..3ad8186 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -2637,16 +2637,8 @@ mergeruns(Tuplesortstate *state)
 	}
 
 	/*
-	 * Allocate a new 'memtuples' array, for the heap.  It will hold one tuple
-	 * from each input tape.
-	 */
-	state->memtupsize = numInputTapes;
-	state->memtuples = (SortTuple *) palloc(numInputTapes * sizeof(SortTuple));
-	USEMEM(state, GetMemoryChunkSpace(state->memtuples));
-
-	/*
-	 * Use all the remaining memory we have available for read buffers among
-	 * the input tapes.
+	 * Use all the spare memory we have available for read buffers among the
+	 * input tapes.
 	 *
 	 * We do this only after checking for the case that we produced only one
 	 * initial run, because there is no need to use a large read buffer when
@@ -2669,9 +2661,17 @@ mergeruns(Tuplesortstate *state)
 			 (state->availMem) / 1024, numInputTapes);
 #endif
 
-	state->read_buffer_size = Min(state->availMem / numInputTapes, 0);
+	state->read_buffer_size = state->availMem / numInputTapes;
 	USEMEM(state, state->availMem);
 
+	/*
+	 * Allocate a new 'memtuples' array, for the heap.  It will hold one tuple
+	 * from each input tape.
+	 */
+	state->memtupsize = numInputTapes;
+	state->memtuples = (SortTuple *) palloc(numInputTapes * sizeof(SortTuple));
+	USEMEM(state, GetMemoryChunkSpace(state->memtuples));
+
 	/* End of step D2: rewind all output tapes to prepare for merging */
 	for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
 		LogicalTapeRewindForRead(state->tapeset, tapenum, state->read_buffer_size);
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index e4e0e27..8dd74a3 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -32,6 +32,10 @@
 #define SYNC_REP_WAITING			1
 #define SYNC_REP_WAIT_COMPLETE		2
 
+/* sync_method of SyncRepConfigData */
+#define SYNC_REP_PRIORITY	0
+#define SYNC_REP_QUORUM		1
+
 /*
  * Struct for the configuration of synchronous replication.
  *
@@ -45,10 +49,13 @@ typedef struct SyncRepConfigData
 	int			num_sync;		/* number of sync standbys that we need to
 								 * wait for */
 	int			nmembers;		/* number of members in the following list */
+	int			sync_method;	/* synchronization method */
 	/* member_names contains nmembers consecutive nul-terminated C strings */
 	char		member_names[FLEXIBLE_ARRAY_MEMBER];
 } SyncRepConfigData;
 
+extern SyncRepConfigData *SyncRepConfig;
+
 /* communication variables for parsing synchronous_standby_names GUC */
 extern SyncRepConfigData *syncrep_parse_result;
 extern char *syncrep_parse_error_msg;
diff --git a/src/test/recovery/t/007_sync_rep.pl b/src/test/recovery/t/007_sync_rep.pl
index 0c87226..c502d20 100644
--- a/src/test/recovery/t/007_sync_rep.pl
+++ b/src/test/recovery/t/007_sync_rep.pl
@@ -3,7 +3,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 8;
+use Test::More tests => 11;
 
 # Query checking sync_priority and sync_state of each standby
 my $check_sql =
@@ -107,7 +107,7 @@ test_sync_state(
 	$node_master, qq(standby2|2|sync
 standby3|3|sync),
 	'2 synchronous standbys',
-	'2(standby1,standby2,standby3)');
+	'FIRST 2(standby1,standby2,standby3)');
 
 # Start standby1
 $node_standby_1->start;
@@ -138,7 +138,7 @@ standby2|4|sync
 standby3|3|sync
 standby4|1|sync),
 	'num_sync exceeds the num of potential sync standbys',
-	'6(standby4,standby0,standby3,standby2)');
+	'FIRST 6(standby4,standby0,standby3,standby2)');
 
 # The setting that * comes before another standby name is acceptable
 # but does not make sense in most cases. Check that sync_state is
@@ -150,7 +150,7 @@ standby2|2|sync
 standby3|2|potential
 standby4|2|potential),
 	'asterisk comes before another standby name',
-	'2(standby1,*,standby2)');
+	'FIRST 2(standby1,*,standby2)');
 
 # Check that the setting of '2(*)' chooses standby2 and standby3 that are stored
 # earlier in WalSnd array as sync standbys.
@@ -160,7 +160,7 @@ standby2|1|sync
 standby3|1|sync
 standby4|1|potential),
 	'multiple standbys having the same priority are chosen as sync',
-	'2(*)');
+	'FIRST 2(*)');
 
 # Stop Standby3 which is considered in 'sync' state.
 $node_standby_3->stop;
@@ -172,3 +172,34 @@ test_sync_state(
 standby2|1|sync
 standby4|1|potential),
 	'potential standby found earlier in array is promoted to sync');
+
+# Check that the state of standbys listed as a voter are having
+# same priority when synchronous_standby_names uses quorum method.
+test_sync_state(
+$node_master, qq(standby1|1|quorum
+standby2|2|quorum
+standby4|0|async),
+'2 quorum and 1 async',
+'ANY 2(standby1, standby2)');
+
+# Check that state of standbys are not the same as the behaviour of that
+# 'ANY' is specified.
+test_sync_state(
+$node_master, qq(standby1|1|quorum
+standby2|2|quorum
+standby4|0|async),
+'not specify synchronization method',
+'2(standby1, standby2)');
+
+# Start Standby3 which will be considered in 'quorum' state.
+$node_standby_3->start;
+
+# Check that set setting of 'ANY 2(*)' chooses all standbys as
+# voter.
+test_sync_state(
+$node_master, qq(standby1|1|quorum
+standby2|1|quorum
+standby3|1|quorum
+standby4|1|quorum),
+'all standbys are considered as candidates for quorum commit',
+'ANY 2(*)');
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to