On Mon, Feb 1, 2016 at 11:28 PM, Fujii Masao <masao.fu...@gmail.com> wrote:
> On Mon, Feb 1, 2016 at 5:36 PM, Masahiko Sawada <sawada.m...@gmail.com> wrote:
>> On Sun, Jan 31, 2016 at 8:58 PM, Michael Paquier
>> <michael.paqu...@gmail.com> wrote:
>>> On Sun, Jan 31, 2016 at 5:28 PM, Masahiko Sawada <sawada.m...@gmail.com> 
>>> wrote:
>>>> On Sun, Jan 31, 2016 at 5:18 PM, Michael Paquier
>>>> <michael.paqu...@gmail.com> wrote:
>>>>> On Sun, Jan 31, 2016 at 5:08 PM, Masahiko Sawada <sawada.m...@gmail.com> 
>>>>> wrote:
>>>>>> On Sun, Jan 31, 2016 at 1:17 PM, Michael Paquier
>>>>>> <michael.paqu...@gmail.com> wrote:
>>>>>>> On Thu, Jan 28, 2016 at 10:10 PM, Masahiko Sawada wrote:
>>>>>>>> By the discussions so far, I'm planning to have several replication
>>>>>>>> methods such as 'quorum', 'complex' in the feature, and the each
>>>>>>>> replication method specifies the syntax of s_s_names.
>>>>>>>> It means that s_s_names could have the number of sync standbys like
>>>>>>>> what current patch does.
>>>>>>>
>>>>>>> What if the application_name of a standby node has the format of an 
>>>>>>> integer?
>>>>>>
>>>>>> Even if the standby has an integer as application_name, we can set
>>>>>> s_s_names like '2,1,2,3'.
>>>>>> The leading '2' is always handled as the number of sync standbys when
>>>>>> s_r_method = 'priority'.
>>>>>
>>>>> Hm. I agree with Fujii-san here, having the number of sync standbys
>>>>> defined in a parameter that should have a list of names is a bit
>>>>> confusing. I'd rather have a separate GUC, which brings us back to one
>>>>> of the first patches that I came up with, and a couple of people,
>>>>> including Josh were not happy with that because this did not support
>>>>> real quorum. Perhaps the final answer would be really to get a set of
>>>>> hooks, and a contrib module making use of that.
>>>>
>>>> Yeah, I agree with having set of hooks, and postgres core has simple
>>>> multi sync replication mechanism like you suggested at first version.
>>>
>>> If there are hooks, I don't think that we should really bother about
>>> having in core anything more complicated than what we have now. The
>>> trick will be to come up with a hook design modular enough to support
>>> the kind of configurations mentioned on this thread. Roughly perhaps a
>>> refactoring of the syncrep code so as it is possible to wait for
>>> multiple targets some of them being optional,, one modular way in
>>> pg_stat_get_wal_senders to represent the status of a node to user, and
>>> another hook to return to decide which are the nodes to wait for. Some
>>> of the nodes being waited for may be based on conditions for quorum
>>> support. That's a hard problem to do that in a flexible enough way.
>>
>> Hm, I think not-nested quorum and priority are not complicated, and we
>> should support at least both or either simple method in core of
>> postgres.
>> More complicated method like using json-style, or dedicated language
>> would be supported by external module.
>
> So what about the following plan?
>
> [first version]
> Add only synchronous_standby_num which specifies the number of standbys
> that the master must wait for before marking sync replication as completed.
> This version supports simple use cases like "I want to have two synchronous
> standbys".
>
> [second version]
> Add synchronous_replication_method: 'prioriry' and 'quorum'. This version
> additionally supports simple quorum commit case like "I want to ensure
> that WAL is replicated synchronously to at least two standbys from five
> ones listed in s_s_names".
>
> Or
>
> Add something like quorum_replication_num and quorum_standby_names, i.e.,
> the master must wait for at least q_r_num standbys from ones listed in
> q_s_names before marking sync replication as completed. Also the master
> must wait for sync replication according to s_s_num and s_s_num.
> That is, this approach separates 'priority' and 'quorum' to each parameters.
> This increases the number of GUC parameters, but ISTM less confusing, and
> it supports a bit complicated case like "there is one local standby and three
> remote standbys, then I want to ensure that WAL is replicated synchronously
> to the local standby and at least two remote one", e.g.,
>
>   s_s_num = 1, s_s_names = 'local'
>   q_s_num = 2, q_s_names = 'remote1, remote2, remote3'
>
> [third version]
> Add the hooks for more complicated sync replication cases.
>
> I'm thinking that the realistic target for 9.6 might be the first one.
>

Thank you for suggestion.

I agree with first version, and attached the updated patch which are
modified so that it supports simple multiple sync replication you
suggested.
(but test cases are not included yet.)

Regards,

--
Masahiko Sawada
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index 7f85b88..9a2f7e7 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -29,10 +29,10 @@
  * single ordered queue of waiting backends, so that we can avoid
  * searching the through all waiters each time we receive a reply.
  *
- * In 9.1 we support only a single synchronous standby, chosen from a
- * priority list of synchronous_standby_names. Before it can become the
- * synchronous standby it must have caught up with the primary; that may
- * take some time. Once caught up, the current highest priority standby
+ * In 9.6 we support multiple synchronous standbys, chosen from a priority
+ * list of synchronous_standby_names. Before any standby can become a
+ * synchronous standbys it must have caught up with the primary; that may
+ * take some time. Once caught up, the current highest priority standbys
  * will release waiters from the queue.
  *
  * Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group
@@ -59,9 +59,12 @@
 
 /* User-settable parameters for sync rep */
 char	   *SyncRepStandbyNames;
+int			synchronous_standby_num;
 
 #define SyncStandbysDefined() \
-	(SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
+	(SyncRepStandbyNames != NULL && \
+	 SyncRepStandbyNames[0] != '\0' && \
+	 synchronous_standby_num > 0)
 
 static bool announce_next_takeover = true;
 
@@ -349,57 +352,168 @@ SyncRepInitConfig(void)
 }
 
 /*
- * Find the WAL sender servicing the synchronous standby with the lowest
- * priority value, or NULL if no synchronous standby is connected. If there
- * are multiple standbys with the same lowest priority value, the first one
- * found is selected. The caller must hold SyncRepLock.
+ * Is this wal sender managing a standby that is streaming and
+ * listed as a synchronous standby?
  */
-WalSnd *
-SyncRepGetSynchronousStandby(void)
+bool
+SyncRepActiveListedWalSender(int num)
 {
-	WalSnd	   *result = NULL;
-	int			result_priority = 0;
-	int			i;
+	volatile WalSnd *walsnd = &WalSndCtl->walsnds[num];
+
+	/* Must be active */
+	if (walsnd->pid == 0)
+		return false;
+
+	/* Must be streaming */
+	if (walsnd->state != WALSNDSTATE_STREAMING)
+		return false;
+
+	/* Must be synchronous */
+	if (walsnd->sync_standby_priority == 0)
+		return false;
+
+	/* Must have a valid flush position */
+	if (XLogRecPtrIsInvalid(walsnd->flush))
+		return false;
+
+	return true;
+}
+
+/*
+ * Get both LSNs: write and flush, and confirm whether we have advanced
+ *  to LSN or not.
+ */
+bool
+SyncRepSyncedLsnAdvancedTo(XLogRecPtr *write_pos, XLogRecPtr *flush_pos)
+{
+	XLogRecPtr tmp_write_pos;
+	XLogRecPtr tmp_flush_pos;
+	bool		ret = false;
+
+	ret = SyncRepGetSyncLsns(&tmp_write_pos, &tmp_flush_pos);
+
+	/* Have we advanced LSN? */
+	if (ret)
+	{
+		if (MyWalSnd->write >= tmp_write_pos)
+			*write_pos = tmp_write_pos;
+		if (MyWalSnd->flush >= tmp_flush_pos)
+			*flush_pos = tmp_flush_pos;
+
+		return true;
+	}
+
+	return false;
+}
+
+/*
+ * Populate a caller-supplied buffer with the walsnds indexes of the
+ * highest priority active synchronous standbys, up to the a limit of
+ * 'synchronous_standby_num'. The order of the results is undefined.
+ * Return the number of results actually written.
+ */
+int
+SyncRepGetSyncStandbys(int *sync_standbys)
+{
+	int	priority = 0;
+	int	num_sync = 0;
+	int	i;
 
 	for (i = 0; i < max_wal_senders; i++)
 	{
 		/* Use volatile pointer to prevent code rearrangement */
 		volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
-		int			this_priority;
+		int			j;
 
-		/* Must be active */
-		if (walsnd->pid == 0)
+		/* Is this wal sender considerable one? */
+		if (!SyncRepActiveListedWalSender(i))
 			continue;
 
-		/* Must be streaming */
-		if (walsnd->state != WALSNDSTATE_STREAMING)
-			continue;
+		if (num_sync == synchronous_standby_num)
+		{
+			if (walsnd->sync_standby_priority > priority)
+				continue;
+
+			for (j = 0; j < num_sync; j++)
+			{
+				volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_standbys[j]];
+
+				/* Found lowest priority standby, so replace it */
+				if (walsndloc->sync_standby_priority == priority &&
+					walsnd->sync_standby_priority < priority)
+					sync_standbys[j] = i;
+
+				/* Update highest priority standby */
+				if (priority < walsndloc->sync_standby_priority)
+					priority = walsndloc->sync_standby_priority;
+			}
+		}
+		else
+		{
+			sync_standbys[num_sync] = i;
+			num_sync++;
 
-		/* Must be synchronous */
-		this_priority = walsnd->sync_standby_priority;
-		if (this_priority == 0)
-			continue;
+			/* Keep track highest priority standby */
+			if (priority < walsnd->sync_standby_priority)
+				priority = walsnd->sync_standby_priority;
+		}
+	}
 
-		/* Must have a lower priority value than any previous ones */
-		if (result != NULL && result_priority <= this_priority)
-			continue;
+	return num_sync;
+}
 
-		/* Must have a valid flush position */
-		if (XLogRecPtrIsInvalid(walsnd->flush))
+/*
+ * Obtain currently synced LSN location: write and flush.
+ */
+bool
+SyncRepGetSyncLsns(XLogRecPtr *write_pos, XLogRecPtr *flush_pos)
+{
+	int			sync_standbys[SYNC_REP_MAX_SYNC_STANDBY_NUM];
+	int			num_sync;
+	int			i;
+	XLogRecPtr	synced_write = InvalidXLogRecPtr;
+	XLogRecPtr	synced_flush = InvalidXLogRecPtr;
+
+	num_sync = SyncRepGetSyncStandbys(sync_standbys);
+
+	for (i = 0; i < num_sync; i++)
+	{
+		elog(WARNING, "sync_standbys[%d] = %d", i, sync_standbys[i]);
+	}
+	elog(WARNING, "num_sync = %d, s_s_num = %d", num_sync, synchronous_standby_num);
+
+	/* Just return, if sync standby is not enough */
+	if (num_sync < synchronous_standby_num)
+		return false;
+
+	for (i = 0; i < num_sync; i++)
+	{
+		volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_standbys[i]];
+
+		SpinLockAcquire(&walsndloc->mutex);
+
+		/* Store first candidate */
+		if (XLogRecPtrIsInvalid(synced_write) && XLogRecPtrIsInvalid(synced_flush))
+		{
+			synced_write = walsndloc->write;
+			synced_flush = walsndloc->flush;
+			SpinLockRelease(&walsndloc->mutex);
 			continue;
+		}
 
-		result = (WalSnd *) walsnd;
-		result_priority = this_priority;
+		/* Keep/Collect the earliest write and flush LSNs among prioritized standbys */
+		if (synced_write > walsndloc->write)
+			synced_write = walsndloc->write;
+		if (synced_flush > walsndloc->flush)
+			synced_flush = walsndloc->flush;
 
-		/*
-		 * If priority is equal to 1, there cannot be any other WAL senders
-		 * with a lower priority, so we're done.
-		 */
-		if (this_priority == 1)
-			return result;
+		SpinLockRelease(&walsndloc->mutex);
 	}
 
-	return result;
+	*write_pos = synced_write;
+	*flush_pos = synced_flush;
+
+	return true;
 }
 
 /*
@@ -413,9 +527,9 @@ void
 SyncRepReleaseWaiters(void)
 {
 	volatile WalSndCtlData *walsndctl = WalSndCtl;
-	WalSnd	   *syncWalSnd;
-	int			numwrite = 0;
-	int			numflush = 0;
+	XLogRecPtr	write_pos = InvalidXLogRecPtr;
+	XLogRecPtr	flush_pos = InvalidXLogRecPtr;
+	int			numwrite, numflush;
 
 	/*
 	 * If this WALSender is serving a standby that is not on the list of
@@ -428,23 +542,12 @@ SyncRepReleaseWaiters(void)
 		XLogRecPtrIsInvalid(MyWalSnd->flush))
 		return;
 
-	/*
-	 * We're a potential sync standby. Release waiters if we are the highest
-	 * priority standby.
-	 */
 	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
-	syncWalSnd = SyncRepGetSynchronousStandby();
 
-	/* We should have found ourselves at least */
-	Assert(syncWalSnd != NULL);
-
-	/*
-	 * If we aren't managing the highest priority standby then just leave.
-	 */
-	if (syncWalSnd != MyWalSnd)
+	/* Get currently synced LSNs according */
+	if (!(SyncRepSyncedLsnAdvancedTo(&write_pos, &flush_pos)))
 	{
 		LWLockRelease(SyncRepLock);
-		announce_next_takeover = true;
 		return;
 	}
 
@@ -452,14 +555,14 @@ SyncRepReleaseWaiters(void)
 	 * Set the lsn first so that when we wake backends they will release up to
 	 * this location.
 	 */
-	if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write)
+	if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < write_pos)
 	{
-		walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write;
+		walsndctl->lsn[SYNC_REP_WAIT_WRITE] = write_pos;
 		numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
 	}
-	if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->flush)
+	if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flush_pos)
 	{
-		walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
+		walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flush_pos;
 		numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
 	}
 
@@ -506,6 +609,10 @@ SyncRepGetStandbyPriority(void)
 	if (am_cascading_walsender)
 		return 0;
 
+	/* If no synchronous standby allowed, no cake for this WAL sender */
+	if (!SyncStandbysDefined())
+		return 0;
+
 	/* Need a modifiable copy of string */
 	rawstring = pstrdup(SyncRepStandbyNames);
 
@@ -521,8 +628,9 @@ SyncRepGetStandbyPriority(void)
 
 	foreach(l, elemlist)
 	{
-		char	   *standby_name = (char *) lfirst(l);
+		char	   *standby_name;
 
+		standby_name = (char *) lfirst(l);
 		priority++;
 
 		if (pg_strcasecmp(standby_name, application_name) == 0 ||
@@ -683,7 +791,6 @@ SyncRepQueueIsOrderedByLSN(int mode)
  * Synchronous Replication functions executed by any process
  * ===========================================================
  */
-
 bool
 check_synchronous_standby_names(char **newval, void **extra, GucSource source)
 {
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index c03e045..8586af4 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2749,9 +2749,11 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 	Tuplestorestate *tupstore;
 	MemoryContext per_query_ctx;
 	MemoryContext oldcontext;
-	WalSnd	   *sync_standby;
+	int			*sync_standbys;
+	int			num_sync;
 	int			i;
 
+
 	/* check to see if caller supports us returning a tuplestore */
 	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
 		ereport(ERROR,
@@ -2777,11 +2779,13 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 
 	MemoryContextSwitchTo(oldcontext);
 
+	sync_standbys = (int *) palloc(sizeof(int) * synchronous_standby_num);
+
 	/*
-	 * Get the currently active synchronous standby.
+	 * Get the currently active synchronous standbys.
 	 */
 	LWLockAcquire(SyncRepLock, LW_SHARED);
-	sync_standby = SyncRepGetSynchronousStandby();
+	num_sync = SyncRepGetSyncStandbys(sync_standbys);
 	LWLockRelease(SyncRepLock);
 
 	for (i = 0; i < max_wal_senders; i++)
@@ -2854,18 +2858,34 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 			 */
 			if (priority == 0)
 				values[7] = CStringGetTextDatum("async");
-			else if (walsnd == sync_standby)
-				values[7] = CStringGetTextDatum("sync");
 			else
-				values[7] = CStringGetTextDatum("potential");
+			{
+				int		j;
+				bool	found = false;
+
+				for (j = 0; j < num_sync; j++)
+				{
+					/* Found sync standby */
+					if (i == sync_standbys[j])
+					{
+						values[7] = CStringGetTextDatum("sync");
+						found = true;
+						break;
+					}
+				}
+				if (!found)
+					values[7] = CStringGetTextDatum("potential");
+			}
 		}
 
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
 	}
 
 	/* clean up and return the tuplestore */
+	pfree(sync_standbys);
 	tuplestore_donestoring(tupstore);
 
+
 	return (Datum) 0;
 }
 
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 38ba82f..3638c0d 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2683,6 +2683,16 @@ static struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"synchronous_standby_num", PGC_SIGHUP, REPLICATION_MASTER,
+		 gettext_noop("Number fo sync standbys."),
+		 NULL,
+		},
+		&synchronous_standby_num,
+		1, 0, SYNC_REP_MAX_SYNC_STANDBY_NUM,
+		NULL, NULL, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 029114f..06f1a51 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -241,6 +241,7 @@
 #synchronous_standby_names = ''	# standby servers that provide sync rep
 				# comma-separated list of application_name
 				# from standby(s); '*' = all
+#synchronous_standby_num = 0	# number of standby servers using sync rep
 #vacuum_defer_cleanup_age = 0	# number of xacts by which cleanup is delayed
 
 # - Standby Servers -
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index 96e059b..0fb7fc6 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -31,8 +31,13 @@
 #define SYNC_REP_WAITING			1
 #define SYNC_REP_WAIT_COMPLETE		2
 
+/* Limit of the number of synchronous standbys */
+#define SYNC_REP_MAX_SYNC_STANDBY_NUM 256
+
 /* user-settable parameters for synchronous replication */
 extern char *SyncRepStandbyNames;
+extern int	synchronous_replication_method;
+extern int	synchronous_standby_num;
 
 /* called by user backend */
 extern void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN);
@@ -49,7 +54,12 @@ extern void SyncRepUpdateSyncStandbysDefined(void);
 
 /* forward declaration to avoid pulling in walsender_private.h */
 struct WalSnd;
-extern struct WalSnd *SyncRepGetSynchronousStandby(void);
+
+extern int SyncRepGetSyncStandbys(int *sync_standbys);
+extern bool SyncRepSyncedLsnAdvancedTo(XLogRecPtr *write_pos, XLogRecPtr *flush_pos);
+extern bool SyncRepActiveListedWalSender(int num);
+extern int SyncRepGetSyncStandbys(int *sync_standbys);
+extern bool	SyncRepGetSyncLsns(XLogRecPtr *write_pos, XLogRecPtr *flush_pos);
 
 extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source);
 extern void assign_synchronous_commit(int newval, void *extra);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to