On Tue, Feb 9, 2016 at 10:32 PM, Michael Paquier
<michael.paqu...@gmail.com> wrote:
> On Wed, Feb 3, 2016 at 7:33 AM, Robert Haas wrote:
>> Also, to be frank, I think we ought to be putting more effort into
>> another patch in this same area, specifically Thomas Munro's causal
>> reads patch.  I think a lot of people today are trying to use
>> synchronous replication to build load-balancing clusters and avoid the
>> problem where you write some data and then read back stale data from a
>> standby server.  Of course, our current synchronous replication
>> facilities make no such guarantees - his patch does, and I think
>> that's pretty important.  I'm not saying that we shouldn't do this
>> too, of course.
>
> Yeah, sure. Each one of those patches is trying to solve a different
> problem where Postgres is deficient, here we'd like to be sure a
> commit WAL record is correctly flushed on multiple standbys, while the
> patch of Thomas is trying to ensure that there is no need to scan for
> the replay position of a standby using some GUC parameters and a
> validation/sanity layer in syncrep.c to do that. Surely the patch of
> this thread has got more attention than Thomas', and both of them have
> merits and try to address real problems. FWIW, the patch of Thomas is
> a topic that I find rather interesting, and I am planning to look at
> it as well, perhaps for next CF or even before that. We'll see how
> other things move on.

Attached first version dedicated language patch (document patch is not yet.)

This patch supports only 1-nest priority method, but this feature will
be expanded with adding quorum method or > 1 level nesting.
So this patch are implemented while being considered about its extensibility.
And I've implemented the new system view we discussed on this thread
but that feature is not included in this patch (because it's not
necessary yet now)

== Syntax ==
s_s_names can have two type syntaxes like follows,

1. s_s_names = 'node1, node2, node3'
2. s_s_names = '2[node1, node2, node3]'

#1 syntax is for backward compatibility, which implies the master
server wait for only 1 server.
#2 syntax is new syntax using dedicated language.

In above #2 setting, node1 standby has lowest priority and node3
standby has highest priority.
And master server will wait for COMMIT until at least 2 lowest
priority standbys send ACK to master.

== Memory Structure ==
Previously, master server has value of s_s_names as string, and used
it when master server determine standby priority.
This patch changed it so that master server has new memory structure
(called SyncGroupNode) in order to be able to handle multiple (and
nested in the future) standby nodes flexibly.
All information of SyncGroupNode are set during parsing s_s_names.

The memory structure is,

struct    SyncGroupNode
{
   /* Common information */
   int        type;
   char    *name;
   SyncGroupNode    *next; /* same group next name node */

   /* For group ndoe */
   int sync_method; /* priority */
   int    wait_num;
   SyncGroupNode    *member; /* member of its group */
   bool (*SyncRepGetSyncedLsnsFn) (SyncGroupNode *group, XLogRecPtr *write_pos,
                                   XLogRecPtr *flush_pos);
   int (*SyncRepGetSyncStandbysFn) (SyncGroupNode *group, int *list);
};

SyncGroupNode can be different two types; name node, group node, and
have pointer to another name/group node in same group and list of
group members.
name node represents a synchronous standby.
group node represents a group of some name nodes, which can have list
of group member, and synchronous method, number of waiting node.
The list of members are linked with one-way list, and are located in
s_s_names definition order.
e.g. in case of above #2 setting, member list could be,

"main".member -> "node1".next -> "node2".next -> "node3".next -> NULL

The most top level node is always "main" group node. i.g., in this
version patch, only 1 group ("main" group) is created which has some
name nodes (not group node).
And group node has two functions pointer;

* SyncRepGetSyncedLsnsFn
This function decides group write/flush LSNs at that moment.
For example in case of priority method, the lowest LSNs of standbys
that are considered as synchronous should be selected.
If there are not synchronous standbys enough to decide LSNs then this
function return false.

* SyncRepGetSyncStandbysFn :
This function obtains array of walsnd positions of its standby members
that are considered as synchronous.

This implementation might not good in some reason, so please give me feedbacks.
And I will create new commitfest entry for this patch to CF5.

Regards,

--
Masahiko Sawada
diff --git a/src/backend/Makefile b/src/backend/Makefile
index b3d5e2e..3e36686 100644
--- a/src/backend/Makefile
+++ b/src/backend/Makefile
@@ -203,7 +203,7 @@ distprep:
 	$(MAKE) -C parser	gram.c gram.h scan.c
 	$(MAKE) -C bootstrap	bootparse.c bootscanner.c
 	$(MAKE) -C catalog	schemapg.h postgres.bki postgres.description postgres.shdescription
-	$(MAKE) -C replication	repl_gram.c repl_scanner.c
+	$(MAKE) -C replication	syncgroup_gram.c syncgroup_scanner.c repl_gram.c repl_scanner.c
 	$(MAKE) -C storage/lmgr	lwlocknames.h
 	$(MAKE) -C utils	fmgrtab.c fmgroids.h errcodes.h
 	$(MAKE) -C utils/misc	guc-file.c
@@ -319,6 +319,8 @@ maintainer-clean: distclean
 	      catalog/postgres.bki \
 	      catalog/postgres.description \
 	      catalog/postgres.shdescription \
+	      replication/syncgroup_gram.c \
+	      replication/syncgroup_scanner.c \
 	      replication/repl_gram.c \
 	      replication/repl_scanner.c \
 	      storage/lmgr/lwlocknames.c \
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 1337eab..87a1145 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -114,6 +114,7 @@
 #include "postmaster/postmaster.h"
 #include "postmaster/syslogger.h"
 #include "replication/walsender.h"
+#include "replication/syncrep.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/pg_shmem.h"
diff --git a/src/backend/replication/.gitignore b/src/backend/replication/.gitignore
index 2a0491d..00eb556 100644
--- a/src/backend/replication/.gitignore
+++ b/src/backend/replication/.gitignore
@@ -1,2 +1,4 @@
 /repl_gram.c
 /repl_scanner.c
+/syncgroup_gram.c
+/syncgroup_scanner.c
\ No newline at end of file
diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile
index b73370e..1198707 100644
--- a/src/backend/replication/Makefile
+++ b/src/backend/replication/Makefile
@@ -15,7 +15,8 @@ include $(top_builddir)/src/Makefile.global
 override CPPFLAGS := -I. -I$(srcdir) $(CPPFLAGS)
 
 OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \
-	repl_gram.o slot.o slotfuncs.o syncrep.o
+	syncgroup_gram.o repl_gram.o slot.o \
+	slotfuncs.o syncrep.o
 
 SUBDIRS = logical
 
@@ -23,6 +24,10 @@ include $(top_srcdir)/src/backend/common.mk
 
 # repl_scanner is compiled as part of repl_gram
 repl_gram.o: repl_scanner.c
+# syncgroup_scanner is complied as part of syncgroup_gram
+syncgroup_gram.o : syncgroup_scanner.c
 
 # repl_gram.c and repl_scanner.c are in the distribution tarball, so
 # they are not cleaned here.
+# syncgroup_gram.c and syncgroup_scanner.c are in the distribution tarball, so
+# they are not cleaned here.
diff --git a/src/backend/replication/syncgroup_gram.y b/src/backend/replication/syncgroup_gram.y
new file mode 100644
index 0000000..a82445c
--- /dev/null
+++ b/src/backend/replication/syncgroup_gram.y
@@ -0,0 +1,129 @@
+%{
+/*-------------------------------------------------------------------------
+ *
+ * syncgroup_gram.y				- Parser for synchronous replication group
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/replication/syncgroup_gram.y
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "replication/syncrep.h"
+
+static SyncGroupNode *create_name_node(char *name);
+static SyncGroupNode *add_node(SyncGroupNode *node_list, SyncGroupNode *node);
+static SyncGroupNode *create_group_node(int wait_num, SyncGroupNode *node_list);
+
+/*
+ * Bison doesn't allocate anything that needs to live across parser calls,
+ * so we can easily have it use palloc instead of malloc.  This prevents
+ * memory leaks if we error out during parsing.  Note this only works with
+ * bison >= 2.0.  However, in bison 1.875 the default is to use alloca()
+ * if possible, so there's not really much problem anyhow, at least if
+ * you're building with gcc.
+ */
+#define YYMALLOC palloc
+#define YYFREE   pfree
+
+%}
+
+%expect 0
+%name-prefix="syncgroup_yy"
+
+%union
+{
+	int32		val;
+	char	   *str;
+	SyncGroupNode  *expr;
+}
+
+%token <str> NAME
+%token <val> INT
+
+%type <expr> result sync_list sync_element sync_node_group
+
+%start result
+
+%%
+
+result:
+	sync_node_group						{ SyncRepStandbyNames = $1; }
+;
+
+sync_list:
+	sync_element 						{ $$ = $1;}
+	| sync_list ',' sync_element		{ $$ = add_node($1, $3);}
+;
+
+sync_node_group:
+	sync_list							{ $$ = create_group_node(1, $1); }
+|	INT '[' sync_list ']' 				{ $$ = create_group_node($1, $3);}
+;
+
+sync_element:
+	NAME	 							{ $$ = create_name_node($1);}
+;
+
+
+%%
+
+static SyncGroupNode *
+create_name_node(char *name)
+{
+	SyncGroupNode *name_node = (SyncGroupNode *)malloc(sizeof(SyncGroupNode));
+
+	/* Common information */
+	name_node->type = SYNC_REP_GROUP_NAME;
+	name_node->name = strdup(name);
+	name_node->next = NULL;
+
+	/* For GROUP node */
+	name_node->sync_method = 0;
+	name_node->wait_num = 0;
+	name_node->member = NULL;
+	name_node->SyncRepGetSyncedLsnsFn = NULL;
+	name_node->SyncRepGetSyncStandbysFn = NULL;
+
+	return name_node;
+}
+
+static SyncGroupNode *
+create_group_node(int wait_num, SyncGroupNode *node_list)
+{
+	SyncGroupNode *group_node = (SyncGroupNode *) malloc(sizeof(SyncGroupNode));
+
+	/* For NAME node */
+	group_node->type = SYNC_REP_GROUP_GROUP | SYNC_REP_GROUP_MAIN;
+	group_node->name = "main";
+	group_node->next = NULL;
+
+	/* For GROUP node */
+	group_node->sync_method = SYNC_REP_METHOD_PRIORITY;
+	group_node->wait_num = wait_num;
+	group_node->member = node_list;
+	group_node->SyncRepGetSyncedLsnsFn = SyncRepGetSyncedLsnsPriority;
+	group_node->SyncRepGetSyncStandbysFn = SyncRepGetSyncStandbysPriority;
+
+	return group_node;
+}
+
+static SyncGroupNode *
+add_node(SyncGroupNode *node_list, SyncGroupNode *node)
+{
+	SyncGroupNode *tmp = node_list;
+
+	/* Add node to tailing of node_list */
+	while(tmp->next != NULL) tmp = tmp->next;
+
+	tmp->next = node;
+	return node_list;
+}
+
+#include "syncgroup_scanner.c"
diff --git a/src/backend/replication/syncgroup_scanner.l b/src/backend/replication/syncgroup_scanner.l
new file mode 100644
index 0000000..7779f3d
--- /dev/null
+++ b/src/backend/replication/syncgroup_scanner.l
@@ -0,0 +1,89 @@
+%{
+#include "postgres.h"
+
+/* No reason to constrain amount of data slurped */
+#define YY_READ_BUF_SIZE 16777216
+
+/* Handles to the buffer that the lexer uses internally */
+static YY_BUFFER_STATE scanbufhandle;
+
+%}
+%option 8bit
+%option never-interactive
+%option nodefault
+%option noinput
+%option nounput
+%option noyywrap
+%option warn
+%option prefix="syncgroup_yy"
+%option yylineno
+
+space		[ \t\n\r\f]
+non_newline	[^\n\r]
+whitespace	({space}+)
+
+self		[\[\]]
+
+%%
+
+{whitespace}	{ /* ignore */ }
+
+[a-zA-Z][a-zA-Z0-9]*	{
+				yylval.str = strdup(yytext);
+				return NAME;
+				}
+
+[1-9][0-9]*  {
+				yylval.val = atoi(yytext);   return INT;
+		}
+
+[,()\n]	{
+				return yytext[0];
+		}
+
+{self}	{
+				return yytext[0];
+		}
+
+.				{
+					ereport(ERROR,
+							(errcode(ERRCODE_SYNTAX_ERROR),
+							 errmsg("syntax error: unexpected character \"%s\"", yytext)));
+				}
+%%
+
+void
+yyerror(const char *message)
+{
+    ereport(ERROR,
+	    (errcode(ERRCODE_SYNTAX_ERROR),
+		    errmsg_internal("%s", message)));
+}
+
+void
+syncgroup_scanner_init(const char *str)
+{
+	Size		slen = strlen(str);
+	char		*scanbuf;
+
+	/*
+	 * Might be left over after ereport()
+	 */
+	if (YY_CURRENT_BUFFER)
+		yy_delete_buffer(YY_CURRENT_BUFFER);
+
+	/*
+	 * Make a scan buffer with special termination needed by flex.
+	 */
+	scanbuf = (char *) palloc(slen + 2);
+	memcpy(scanbuf, str, slen);
+	scanbuf[slen] = scanbuf[slen + 1] = YY_END_OF_BUFFER_CHAR;
+	scanbufhandle = yy_scan_buffer(scanbuf, slen + 2);
+}
+
+void
+syncgroup_scanner_finish(void)
+{
+	yy_delete_buffer(scanbufhandle);
+	scanbufhandle = NULL;
+}
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index 7f85b88..b02d5a8 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -5,7 +5,7 @@
  * Synchronous replication is new as of PostgreSQL 9.1.
  *
  * If requested, transaction commits wait until their commit LSN is
- * acknowledged by the synchronous standby.
+ * acknowledged by the synchronous standbys.
  *
  * This module contains the code for waiting and release of backends.
  * All code in this module executes on the primary. The core streaming
@@ -34,6 +34,13 @@
  * synchronous standby it must have caught up with the primary; that may
  * take some time. Once caught up, the current highest priority standby
  * will release waiters from the queue.
+ * In 9.5 we support the possibility to have multiple synchronous standbys,
+ * as defined in synchronous_standby_group. Before on standby can become a
+ * synchronous standby it must have caught up with the primary;
+ * that may take some time.
+ *
+ * Waiters will be released from the queue once the number of standbys
+ * specified in synchronous_standby_group have caught.
  *
  * Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group
  *
@@ -47,7 +54,9 @@
 #include <unistd.h>
 
 #include "access/xact.h"
+#include "funcapi.h"
 #include "miscadmin.h"
+#include "nodes/pg_list.h"
 #include "replication/syncrep.h"
 #include "replication/walsender.h"
 #include "replication/walsender_private.h"
@@ -58,10 +67,11 @@
 #include "utils/ps_status.h"
 
 /* User-settable parameters for sync rep */
-char	   *SyncRepStandbyNames;
+SyncGroupNode	   *SyncRepStandbyNames;
+char	   *SyncRepStandbyNamesString;
 
 #define SyncStandbysDefined() \
-	(SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
+	(SyncRepStandbyNamesString != NULL && SyncRepStandbyNamesString[0] != '\0')
 
 static bool announce_next_takeover = true;
 
@@ -72,6 +82,9 @@ static void SyncRepCancelWait(void);
 static int	SyncRepWakeQueue(bool all, int mode);
 
 static int	SyncRepGetStandbyPriority(void);
+static int SyncRepFindWalSenderByName(char *name);
+static void SyncRepClearStandbyGroupList(SyncGroupNode *node);
+static bool SyncRepSyncedLsnAdvancedTo(XLogRecPtr *write_pos, XLogRecPtr *flush_pos);
 
 #ifdef USE_ASSERT_CHECKING
 static bool SyncRepQueueIsOrderedByLSN(int mode);
@@ -197,7 +210,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
 			ereport(WARNING,
 					(errcode(ERRCODE_ADMIN_SHUTDOWN),
 					 errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
-					 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
+					 errdetail("The transaction has already committed locally, but might not have been replicated to the standby(s).")));
 			whereToSendOutput = DestNone;
 			SyncRepCancelWait();
 			break;
@@ -214,7 +227,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
 			QueryCancelPending = false;
 			ereport(WARNING,
 					(errmsg("canceling wait for synchronous replication due to user request"),
-					 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
+					 errdetail("The transaction has already committed locally, but might not have been replicated to the standby(s).")));
 			SyncRepCancelWait();
 			break;
 		}
@@ -318,6 +331,24 @@ SyncRepCleanupAtProcExit(void)
 }
 
 /*
+ * Clear all node in SyncRepStandbyNames recursively.
+ */
+static void
+SyncRepClearStandbyGroupList(SyncGroupNode *group)
+{
+	SyncGroupNode *n = group->member;
+
+	while (n != NULL)
+	{
+		SyncGroupNode *tmp = n->next;
+
+		free(n);
+		n = tmp;
+	}
+}
+
+
+/*
  * ===========================================================
  * Synchronous Replication functions for wal sender processes
  * ===========================================================
@@ -339,8 +370,11 @@ SyncRepInitConfig(void)
 	priority = SyncRepGetStandbyPriority();
 	if (MyWalSnd->sync_standby_priority != priority)
 	{
+		char *walsnd_name;
 		LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
 		MyWalSnd->sync_standby_priority = priority;
+		walsnd_name = (char *)MyWalSnd->name;
+		memcpy(walsnd_name, application_name, sizeof(MyWalSnd->name));
 		LWLockRelease(SyncRepLock);
 		ereport(DEBUG1,
 			(errmsg("standby \"%s\" now has synchronous standby priority %u",
@@ -349,23 +383,19 @@ SyncRepInitConfig(void)
 }
 
 /*
- * Find the WAL sender servicing the synchronous standby with the lowest
- * priority value, or NULL if no synchronous standby is connected. If there
- * are multiple standbys with the same lowest priority value, the first one
- * found is selected. The caller must hold SyncRepLock.
+ * Find active walsender position of WalSnd by name. Returns index of walsnds
+ * array if found, otherwise return -1.
  */
-WalSnd *
-SyncRepGetSynchronousStandby(void)
+static int
+SyncRepFindWalSenderByName(char *name)
 {
-	WalSnd	   *result = NULL;
-	int			result_priority = 0;
-	int			i;
+	int	i;
 
 	for (i = 0; i < max_wal_senders; i++)
 	{
 		/* Use volatile pointer to prevent code rearrangement */
 		volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
-		int			this_priority;
+		char *walsnd_name = (char *) walsnd->name;
 
 		/* Must be active */
 		if (walsnd->pid == 0)
@@ -376,30 +406,20 @@ SyncRepGetSynchronousStandby(void)
 			continue;
 
 		/* Must be synchronous */
-		this_priority = walsnd->sync_standby_priority;
-		if (this_priority == 0)
-			continue;
-
-		/* Must have a lower priority value than any previous ones */
-		if (result != NULL && result_priority <= this_priority)
+		if (walsnd->sync_standby_priority == 0)
 			continue;
 
 		/* Must have a valid flush position */
 		if (XLogRecPtrIsInvalid(walsnd->flush))
 			continue;
 
-		result = (WalSnd *) walsnd;
-		result_priority = this_priority;
-
-		/*
-		 * If priority is equal to 1, there cannot be any other WAL senders
-		 * with a lower priority, so we're done.
-		 */
-		if (this_priority == 1)
-			return result;
+		/* Compare wal sender name */
+		if (pg_strcasecmp(walsnd_name, name) == 0)
+			return i; /* Found */
 	}
 
-	return result;
+	/* Not found */
+	return -1;
 }
 
 /*
@@ -413,7 +433,8 @@ void
 SyncRepReleaseWaiters(void)
 {
 	volatile WalSndCtlData *walsndctl = WalSndCtl;
-	WalSnd	   *syncWalSnd;
+	XLogRecPtr	write_pos = InvalidXLogRecPtr;
+	XLogRecPtr	flush_pos = InvalidXLogRecPtr;
 	int			numwrite = 0;
 	int			numflush = 0;
 
@@ -428,23 +449,11 @@ SyncRepReleaseWaiters(void)
 		XLogRecPtrIsInvalid(MyWalSnd->flush))
 		return;
 
-	/*
-	 * We're a potential sync standby. Release waiters if we are the highest
-	 * priority standby.
-	 */
 	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
-	syncWalSnd = SyncRepGetSynchronousStandby();
-
-	/* We should have found ourselves at least */
-	Assert(syncWalSnd != NULL);
 
-	/*
-	 * If we aren't managing the highest priority standby then just leave.
-	 */
-	if (syncWalSnd != MyWalSnd)
+	if (!(SyncRepSyncedLsnAdvancedTo(&write_pos, &flush_pos)))
 	{
 		LWLockRelease(SyncRepLock);
-		announce_next_takeover = true;
 		return;
 	}
 
@@ -452,14 +461,14 @@ SyncRepReleaseWaiters(void)
 	 * Set the lsn first so that when we wake backends they will release up to
 	 * this location.
 	 */
-	if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write)
+	if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < write_pos)
 	{
-		walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write;
+		walsndctl->lsn[SYNC_REP_WAIT_WRITE] = write_pos;
 		numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
 	}
-	if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->flush)
+	if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flush_pos)
 	{
-		walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
+		walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flush_pos;
 		numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
 	}
 
@@ -483,6 +492,117 @@ SyncRepReleaseWaiters(void)
 }
 
 /*
+ * Get both synced LSNS: write and flush, using its group function and check
+ * whether each LSN has advanced to, or not.
+ */
+static bool
+SyncRepSyncedLsnAdvancedTo(XLogRecPtr *write_pos, XLogRecPtr *flush_pos)
+{
+	XLogRecPtr	cur_write_pos;
+	XLogRecPtr	cur_flush_pos;
+	bool		ret;
+
+	/* Get synced LSNs at this moment */
+	ret = SyncRepStandbyNames->SyncRepGetSyncedLsnsFn(SyncRepStandbyNames,
+													  &cur_write_pos,
+													  &cur_flush_pos);
+
+	/* Check whether each LSN has advanced to */
+	if (ret)
+	{
+		if (MyWalSnd->write >= cur_write_pos)
+			*write_pos = cur_write_pos;
+		if (MyWalSnd->flush >= cur_flush_pos)
+			*flush_pos = cur_flush_pos;
+
+		return true;
+	}
+
+	return false;
+}
+
+/*
+ * Decide synced LSNs at this moment using priority method.
+ * If there are not active standbys enough to determine LSNs, return false.
+ */
+bool
+SyncRepGetSyncedLsnsPriority(SyncGroupNode *group, XLogRecPtr *write_pos, XLogRecPtr *flush_pos)
+{
+	SyncGroupNode	*n;
+	int	num = 0;
+
+	*write_pos = InvalidXLogRecPtr;
+	*flush_pos = InvalidXLogRecPtr;
+
+	for(n = group->member; n != NULL; n = n->next)
+	{
+		int pos;
+
+		/* We found synchronous standbys enough to decide LSNs, return */
+		if (num == group->wait_num)
+			return true;
+
+		pos = SyncRepFindWalSenderByName(n->name);
+
+		if (pos != -1)
+		{
+			volatile WalSnd *walsnd = &WalSndCtl->walsnds[pos];
+			XLogRecPtr	write;
+			XLogRecPtr	flush;
+
+			SpinLockAcquire(&walsnd->mutex);
+			write = walsnd->write;
+			flush = walsnd->flush;
+			SpinLockRelease(&walsnd->mutex);
+
+			if (XLogRecPtrIsInvalid(*write_pos) || *write_pos > write)
+				*write_pos = write;
+			if (XLogRecPtrIsInvalid(*flush_pos) || *flush_pos > flush)
+				*flush_pos = flush;
+		}
+
+		/* Could not find the standby, then find next */
+		if (pos == -1)
+			continue;
+
+		num++;
+	}
+
+	return (num == group->wait_num);
+}
+
+/*
+ * Obtain a array containing positions of standbys of specified group
+ * currently considered as synchronous up to wait_num of its group.
+ * Caller is respnsible for allocating the data obtained.
+ */
+int
+SyncRepGetSyncStandbysPriority(SyncGroupNode *group, int *sync_list)
+{
+	SyncGroupNode	*n;
+	int	num = 0;
+
+	for (n = group->member; n != NULL; n = n->next)
+	{
+		int pos = 0;
+
+		/* We already got enough synchronous standbys, return */
+		if (num == group->wait_num)
+			return num;
+
+		pos = SyncRepFindWalSenderByName(n->name);
+
+		if (pos == -1)
+			continue;
+
+		sync_list[num] = pos;
+		num++;
+	}
+
+	return num;
+}
+
+/*
  * Check if we are in the list of sync standbys, and if so, determine
  * priority sequence. Return priority if set, or zero to indicate that
  * we are not a potential sync standby.
@@ -493,9 +613,6 @@ SyncRepReleaseWaiters(void)
 static int
 SyncRepGetStandbyPriority(void)
 {
-	char	   *rawstring;
-	List	   *elemlist;
-	ListCell   *l;
 	int			priority = 0;
 	bool		found = false;
 
@@ -506,36 +623,23 @@ SyncRepGetStandbyPriority(void)
 	if (am_cascading_walsender)
 		return 0;
 
-	/* Need a modifiable copy of string */
-	rawstring = pstrdup(SyncRepStandbyNames);
-
-	/* Parse string into list of identifiers */
-	if (!SplitIdentifierString(rawstring, ',', &elemlist))
-	{
-		/* syntax error in list */
-		pfree(rawstring);
-		list_free(elemlist);
-		/* GUC machinery will have already complained - no need to do again */
-		return 0;
-	}
-
-	foreach(l, elemlist)
+	if (SyncStandbysDefined())
 	{
-		char	   *standby_name = (char *) lfirst(l);
+		SyncGroupNode	*n;
 
-		priority++;
-
-		if (pg_strcasecmp(standby_name, application_name) == 0 ||
-			pg_strcasecmp(standby_name, "*") == 0)
+		for (n = SyncRepStandbyNames->member; n != NULL; n = n->next)
 		{
-			found = true;
-			break;
+			priority++;
+
+			if (pg_strcasecmp(n->name, application_name) == 0 ||
+				pg_strcasecmp(n->name, "*") == 0)
+			{
+				found = true;
+				break;
+			}
 		}
 	}
 
-	pfree(rawstring);
-	list_free(elemlist);
-
 	return (found ? priority : 0);
 }
 
@@ -687,32 +791,31 @@ SyncRepQueueIsOrderedByLSN(int mode)
 bool
 check_synchronous_standby_names(char **newval, void **extra, GucSource source)
 {
-	char	   *rawstring;
-	List	   *elemlist;
+	int	parse_rc;
 
-	/* Need a modifiable copy of string */
-	rawstring = pstrdup(*newval);
-
-	/* Parse string into list of identifiers */
-	if (!SplitIdentifierString(rawstring, ',', &elemlist))
+	if (*newval != NULL && (*newval)[0] != '\0')
 	{
-		/* syntax error in list */
-		GUC_check_errdetail("List syntax is invalid.");
-		pfree(rawstring);
-		list_free(elemlist);
-		return false;
-	}
+		syncgroup_scanner_init(*newval);
+		parse_rc = syncgroup_yyparse();
 
-	/*
-	 * Any additional validation of standby names should go here.
-	 *
-	 * Don't attempt to set WALSender priority because this is executed by
-	 * postmaster at startup, not WALSender, so the application_name is not
-	 * yet correctly set.
-	 */
+		if (parse_rc != 0)
+		{
+			GUC_check_errdetail("Invalid syntax");
+			return false;
+		}
+		syncgroup_scanner_finish();
+
+		/*
+		 * Any additional validation of standby names should go here.
+		 *
+		 * Don't attempt to set WALSender priority because this is executed by
+		 * postmaster at startup, not WALSender, so the application_name is not
+		 * yet correctly set.
+		 */
 
-	pfree(rawstring);
-	list_free(elemlist);
+		SyncRepClearStandbyGroupList(SyncRepStandbyNames);
+		SyncRepStandbyNames = NULL;
+	}
 
 	return true;
 }
@@ -733,3 +836,20 @@ assign_synchronous_commit(int newval, void *extra)
 			break;
 	}
 }
+
+void
+assign_synchronous_standby_names(const char *newval, void *extra)
+{
+	int	parse_rc;
+
+	if (newval != NULL && newval[0] != '\0')
+	{
+		syncgroup_scanner_init(newval);
+		parse_rc = syncgroup_yyparse();
+
+		if (parse_rc != 0)
+			GUC_check_errdetail("Invalid syntax");
+
+		syncgroup_scanner_finish();
+	}
+}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index c03e045..9e51ed9 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2735,7 +2735,6 @@ WalSndGetStateString(WalSndState state)
 	return "UNKNOWN";
 }
 
-
 /*
  * Returns activity of walsenders, including pids and xlog locations sent to
  * standby servers.
@@ -2749,7 +2748,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 	Tuplestorestate *tupstore;
 	MemoryContext per_query_ctx;
 	MemoryContext oldcontext;
-	WalSnd	   *sync_standby;
+	int	   *sync_standbys;
+	int		num_sync;
 	int			i;
 
 	/* check to see if caller supports us returning a tuplestore */
@@ -2780,8 +2780,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 	/*
 	 * Get the currently active synchronous standby.
 	 */
+	sync_standbys = (int *) palloc(sizeof(int) * SyncRepStandbyNames->wait_num);
 	LWLockAcquire(SyncRepLock, LW_SHARED);
-	sync_standby = SyncRepGetSynchronousStandby();
+	num_sync = SyncRepGetSyncStandbysPriority(SyncRepStandbyNames, sync_standbys);
 	LWLockRelease(SyncRepLock);
 
 	for (i = 0; i < max_wal_senders; i++)
@@ -2854,10 +2855,23 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 			 */
 			if (priority == 0)
 				values[7] = CStringGetTextDatum("async");
-			else if (walsnd == sync_standby)
-				values[7] = CStringGetTextDatum("sync");
 			else
-				values[7] = CStringGetTextDatum("potential");
+			{
+				int	j;
+				bool	found = false;
+
+				for (j = 0; j < num_sync; j++)
+				{
+					if (sync_standbys[j] == i)
+					{
+						values[7] = CStringGetTextDatum("sync");
+						found = true;
+						break;
+					}
+				}
+				if (!found)
+					values[7] = CStringGetTextDatum("potential");
+			}
 		}
 
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 31a69ca..7e5e408 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -3373,9 +3373,9 @@ static struct config_string ConfigureNamesString[] =
 			NULL,
 			GUC_LIST_INPUT
 		},
-		&SyncRepStandbyNames,
+		&SyncRepStandbyNamesString,
 		"",
-		check_synchronous_standby_names, NULL, NULL
+		check_synchronous_standby_names, assign_synchronous_standby_names, NULL
 	},
 
 	{
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index 96e059b..663f630 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -31,8 +31,36 @@
 #define SYNC_REP_WAITING			1
 #define SYNC_REP_WAIT_COMPLETE		2
 
+/* SyncRepMethod */
+#define SYNC_REP_METHOD_PRIORITY	0
+
+/* SyncGroupNode */
+#define SYNC_REP_GROUP_MAIN			0x01
+#define SYNC_REP_GROUP_NAME			0x02
+#define SYNC_REP_GROUP_GROUP		0x04
+
+struct SyncGroupNode;
+typedef struct SyncGroupNode SyncGroupNode;
+
+struct	SyncGroupNode
+{
+	/* Common information */
+	int		type;
+	char	*name;
+	SyncGroupNode	*next; /* Same group, next name node */
+
+	/* For group ndoe */
+	int sync_method; /* priority */
+	int	wait_num;
+	SyncGroupNode	*member; /* member of its group */
+	bool (*SyncRepGetSyncedLsnsFn) (SyncGroupNode *group, XLogRecPtr *write_pos,
+									XLogRecPtr *flush_pos);
+	int (*SyncRepGetSyncStandbysFn) (SyncGroupNode *group, int *list);
+};
+
 /* user-settable parameters for synchronous replication */
-extern char *SyncRepStandbyNames;
+extern SyncGroupNode *SyncRepStandbyNames;
+extern char	*SyncRepStandbyNamesString;
 
 /* called by user backend */
 extern void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN);
@@ -47,11 +75,23 @@ extern void SyncRepReleaseWaiters(void);
 /* called by checkpointer */
 extern void SyncRepUpdateSyncStandbysDefined(void);
 
-/* forward declaration to avoid pulling in walsender_private.h */
-struct WalSnd;
-extern struct WalSnd *SyncRepGetSynchronousStandby(void);
-
 extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source);
+extern void assign_synchronous_standby_names(const char *newval, void *extra);
 extern void assign_synchronous_commit(int newval, void *extra);
 
+/*
+ * Internal functions for parsing the replication grammar, in syncgroup_gram.y and
+ * syncgroup_scanner.l
+ */
+extern int  syncgroup_yyparse(void);
+extern int  syncgroup_yylex(void);
+extern void syncgroup_yyerror(const char *str) pg_attribute_noreturn();
+extern void syncgroup_scanner_init(const char *query_string);
+extern void syncgroup_scanner_finish(void);
+
+/* function for synchronous replication group */
+extern bool SyncRepGetSyncedLsnsPriority(SyncGroupNode *group,
+								 XLogRecPtr *write_pos, XLogRecPtr *flush_pos);
+extern int SyncRepGetSyncStandbysPriority(SyncGroupNode *group, int *sync_list);
+
 #endif   /* _SYNCREP_H */
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 7794aa5..6b29c18 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -19,6 +19,8 @@
 #include "storage/shmem.h"
 #include "storage/spin.h"
 
+#define MAX_WALSENDER_NAME 8192
+
 typedef enum WalSndState
 {
 	WALSNDSTATE_STARTUP = 0,
@@ -62,6 +64,11 @@ typedef struct WalSnd
 	 * SyncRepLock.
 	 */
 	int			sync_standby_priority;
+
+	/*
+	 * Corresponding standby's application_name.
+	 */
+	const char	   name[MAX_WALSENDER_NAME];
 } WalSnd;
 
 extern WalSnd *MyWalSnd;
diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm
index 1dba7d9..455dea1 100644
--- a/src/tools/msvc/Mkvcbuild.pm
+++ b/src/tools/msvc/Mkvcbuild.pm
@@ -149,6 +149,8 @@ sub mkvcbuild
 	$postgres->AddFiles('src/backend/utils/misc', 'guc-file.l');
 	$postgres->AddFiles('src/backend/replication', 'repl_scanner.l',
 		'repl_gram.y');
+	$postgres->AddFiles('src/backend/replication', 'syncgroup_scanner.l',
+		'syncgroup_gram.y');
 	$postgres->AddDefine('BUILDING_DLL');
 	$postgres->AddLibrary('secur32.lib');
 	$postgres->AddLibrary('ws2_32.lib');
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to