On Fri, Jan 13, 2012 at 9:27 PM, Fujii Masao <masao.fu...@gmail.com> wrote:
> On Fri, Jan 13, 2012 at 7:30 PM, Simon Riggs <si...@2ndquadrant.com> wrote:
>> On Fri, Jan 13, 2012 at 9:15 AM, Simon Riggs <si...@2ndquadrant.com> wrote:
>>> On Fri, Jan 13, 2012 at 7:41 AM, Fujii Masao <masao.fu...@gmail.com> wrote:
>>>
>>>> Thought? Comments?
>>>
>>> This is almost exactly the same as my patch series
>>> "syncrep_queues.v[1,2].patch" earlier this year. Which I know because
>>> I was updating that patch myself last night for 9.2. I'm about half
>>> way through doing that, since you and I agreed in Ottawa I would do
>>> this. Perhaps it is better if we work together?
>>
>> I think this comment is mostly pointless. We don't have time to work
>> together and there's no real reason to. You know what you're doing, so
>> I'll leave you to do it.
>>
>> Please add the Apply mode.
>
> OK, will do.

Done. Attached is the updated version of the patch.

Regards,

-- 
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
*** a/doc/src/sgml/config.sgml
--- b/doc/src/sgml/config.sgml
***************
*** 1559,1567 **** SET ENABLE_SEQSCAN TO OFF;
         <para>
          Specifies whether transaction commit will wait for WAL records
          to be written to disk before the command returns a <quote>success</>
!         indication to the client.  Valid values are <literal>on</>,
!         <literal>local</>, and <literal>off</>.  The default, and safe, value
!         is <literal>on</>.  When <literal>off</>, there can be a delay between
          when success is reported to the client and when the transaction is
          really guaranteed to be safe against a server crash.  (The maximum
          delay is three times <xref linkend="guc-wal-writer-delay">.)  Unlike
--- 1559,1567 ----
         <para>
          Specifies whether transaction commit will wait for WAL records
          to be written to disk before the command returns a <quote>success</>
!         indication to the client.  Valid values are <literal>on</>, <literal>write</>,
!         <literal>apply</>, <literal>local</>, and <literal>off</>.  The default, and safe,
!         value is <literal>on</>.  When <literal>off</>, there can be a delay between
          when success is reported to the client and when the transaction is
          really guaranteed to be safe against a server crash.  (The maximum
          delay is three times <xref linkend="guc-wal-writer-delay">.)  Unlike
***************
*** 1579,1589 **** SET ENABLE_SEQSCAN TO OFF;
          If <xref linkend="guc-synchronous-standby-names"> is set, this
          parameter also controls whether or not transaction commit will wait
          for the transaction's WAL records to be flushed to disk and replicated
!         to the standby server.  The commit wait will last until a reply from
!         the current synchronous standby indicates it has written the commit
!         record of the transaction to durable storage.  If synchronous
          replication is in use, it will normally be sensible either to wait
!         both for WAL records to reach both the local and remote disks, or
          to allow the transaction to commit asynchronously.  However, the
          special value <literal>local</> is available for transactions that
          wish to wait for local flush to disk, but not synchronous replication.
--- 1579,1600 ----
          If <xref linkend="guc-synchronous-standby-names"> is set, this
          parameter also controls whether or not transaction commit will wait
          for the transaction's WAL records to be flushed to disk and replicated
!         to the standby server.  When <literal>on</>, the commit wait will last
!         until a reply from the current synchronous standby indicates it has flushed
!         the commit record of the transaction to durable storage. This will
!         avoids any data loss unless the database cluster of both primary and
!         standby gets corrupted simultaneously. When <literal>write</>,
!         the commit wait will last until a reply from the current synchronous
!         standby indicates it has received the commit record of the transaction
!         to memory. Normally this causes no data loss at the time of failover.
!         However, if both primary and standby crash, and the database cluster of
!         the primary gets corrupted, recent committed transactions might
!         be lost. When <literal>apply</>, the commit will wait until the current
!         synchronous standby has replayed the committed changes successfully.
!         This guarantees that any transactions are visible on the synchronous
!         standby when they are committed. If synchronous
          replication is in use, it will normally be sensible either to wait
!         for both local flush and replication of WAL records, or
          to allow the transaction to commit asynchronously.  However, the
          special value <literal>local</> is available for transactions that
          wish to wait for local flush to disk, but not synchronous replication.
*** a/doc/src/sgml/high-availability.sgml
--- b/doc/src/sgml/high-availability.sgml
***************
*** 1011,1016 **** primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
--- 1011,1039 ----
     </para>
  
     <para>
+     Setting <varname>synchronous_commit</> to <literal>write</> will
+     cause each commit to wait for confirmation that the standby has received
+     the commit record to memory. This provides lower level of durability than
+     that <literal>on</> does. However, it's practically useful setting because
+     it can decrease the response time for the transaction, and causes
+     no data loss unless both the primary and the standby crashes and
+     the database of the primary gets corrupted at the same time.
+    </para>
+ 
+    <para>
+     Setting <varname>synchronous_commit</> to <literal>apply</> will
+     cause each commit to wait for confirmation that the standby has flushed
+     the commit record to durable storage and replayed the committed changes
+     successfully. This provides the same level of durability as <literal>on</>
+     does. This guarantees that any transactions are visible on the standby
+     when they are committed. Note that this makes the transaction commit
+     wait longer time for replication than <literal>on</> or <literal>write</>
+     does because the confirmation about the apply position from the standby
+     is sent less frequently. To decrease the wait time, set
+     <varname>max_standby_streaming_delay</> to a low value.
+    </para>
+ 
+    <para>
      Users will stop waiting if a fast shutdown is requested.  However, as
      when using asynchronous replication, the server will does not fully
      shutdown until all outstanding WAL records are transferred to the currently
***************
*** 1064,1077 **** primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
      <title>Planning for High Availability</title>
  
     <para>
!     Commits made when <varname>synchronous_commit</> is set to <literal>on</>
!     will wait until the sync standby responds. The response may never occur
!     if the last, or only, standby should crash.
     </para>
  
     <para>
      The best solution for avoiding data loss is to ensure you don't lose
!     your last remaining sync standby. This can be achieved by naming multiple
      potential synchronous standbys using <varname>synchronous_standby_names</>.
      The first named standby will be used as the synchronous standby. Standbys
      listed after this will take over the role of synchronous standby if the
--- 1087,1100 ----
      <title>Planning for High Availability</title>
  
     <para>
!     Commits made when <varname>synchronous_commit</> is set to <literal>on</>,
!     <literal>write</> or <literal>apply</> will wait until the synchronous standby responds.
!     The response may never occur if the last, or only, standby should crash.
     </para>
  
     <para>
      The best solution for avoiding data loss is to ensure you don't lose
!     your last remaining synchronous standby. This can be achieved by naming multiple
      potential synchronous standbys using <varname>synchronous_standby_names</>.
      The first named standby will be used as the synchronous standby. Standbys
      listed after this will take over the role of synchronous standby if the
*** a/src/backend/replication/syncrep.c
--- b/src/backend/replication/syncrep.c
***************
*** 20,28 ****
   * per-transaction state information.
   *
   * Replication is either synchronous or not synchronous (async). If it is
!  * async, we just fastpath out of here. If it is sync, then in 9.1 we wait
!  * for the flush location on the standby before releasing the waiting backend.
!  * Further complexity in that interaction is expected in later releases.
   *
   * The best performing way to manage the waiting backends is to have a
   * single ordered queue of waiting backends, so that we can avoid
--- 20,29 ----
   * per-transaction state information.
   *
   * Replication is either synchronous or not synchronous (async). If it is
!  * async, we just fastpath out of here. If it is sync, then we wait for
!  * the write, flush or apply location on the standby before releasing
!  * the waiting backend. Further complexity in that interaction is expected
!  * in later releases.
   *
   * The best performing way to manage the waiting backends is to have a
   * single ordered queue of waiting backends, so that we can avoid
***************
*** 67,79 **** char	   *SyncRepStandbyNames;
  
  static bool announce_next_takeover = true;
  
! static void SyncRepQueueInsert(void);
  static void SyncRepCancelWait(void);
  
  static int	SyncRepGetStandbyPriority(void);
  
  #ifdef USE_ASSERT_CHECKING
! static bool SyncRepQueueIsOrderedByLSN(void);
  #endif
  
  /*
--- 68,82 ----
  
  static bool announce_next_takeover = true;
  
! static int	SyncRepWaitMode = SYNC_REP_NO_WAIT;
! 
! static void SyncRepQueueInsert(int mode);
  static void SyncRepCancelWait(void);
  
  static int	SyncRepGetStandbyPriority(void);
  
  #ifdef USE_ASSERT_CHECKING
! static bool SyncRepQueueIsOrderedByLSN(int mode);
  #endif
  
  /*
***************
*** 120,126 **** SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
  	 * be a low cost check.
  	 */
  	if (!WalSndCtl->sync_standbys_defined ||
! 		XLByteLE(XactCommitLSN, WalSndCtl->lsn))
  	{
  		LWLockRelease(SyncRepLock);
  		return;
--- 123,129 ----
  	 * be a low cost check.
  	 */
  	if (!WalSndCtl->sync_standbys_defined ||
! 		XLByteLE(XactCommitLSN, WalSndCtl->lsn[SyncRepWaitMode]))
  	{
  		LWLockRelease(SyncRepLock);
  		return;
***************
*** 132,139 **** SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
  	 */
  	MyProc->waitLSN = XactCommitLSN;
  	MyProc->syncRepState = SYNC_REP_WAITING;
! 	SyncRepQueueInsert();
! 	Assert(SyncRepQueueIsOrderedByLSN());
  	LWLockRelease(SyncRepLock);
  
  	/* Alter ps display to show waiting for sync rep. */
--- 135,142 ----
  	 */
  	MyProc->waitLSN = XactCommitLSN;
  	MyProc->syncRepState = SYNC_REP_WAITING;
! 	SyncRepQueueInsert(SyncRepWaitMode);
! 	Assert(SyncRepQueueIsOrderedByLSN(SyncRepWaitMode));
  	LWLockRelease(SyncRepLock);
  
  	/* Alter ps display to show waiting for sync rep. */
***************
*** 267,284 **** SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
  }
  
  /*
!  * Insert MyProc into SyncRepQueue, maintaining sorted invariant.
   *
   * Usually we will go at tail of queue, though it's possible that we arrive
   * here out of order, so start at tail and work back to insertion point.
   */
  static void
! SyncRepQueueInsert(void)
  {
  	PGPROC	   *proc;
  
! 	proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue),
! 								   &(WalSndCtl->SyncRepQueue),
  								   offsetof(PGPROC, syncRepLinks));
  
  	while (proc)
--- 270,288 ----
  }
  
  /*
!  * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
   *
   * Usually we will go at tail of queue, though it's possible that we arrive
   * here out of order, so start at tail and work back to insertion point.
   */
  static void
! SyncRepQueueInsert(int mode)
  {
  	PGPROC	   *proc;
  
! 	Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
! 	proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
! 								   &(WalSndCtl->SyncRepQueue[mode]),
  								   offsetof(PGPROC, syncRepLinks));
  
  	while (proc)
***************
*** 290,296 **** SyncRepQueueInsert(void)
  		if (XLByteLT(proc->waitLSN, MyProc->waitLSN))
  			break;
  
! 		proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue),
  									   &(proc->syncRepLinks),
  									   offsetof(PGPROC, syncRepLinks));
  	}
--- 294,300 ----
  		if (XLByteLT(proc->waitLSN, MyProc->waitLSN))
  			break;
  
! 		proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
  									   &(proc->syncRepLinks),
  									   offsetof(PGPROC, syncRepLinks));
  	}
***************
*** 298,304 **** SyncRepQueueInsert(void)
  	if (proc)
  		SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
  	else
! 		SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue), &(MyProc->syncRepLinks));
  }
  
  /*
--- 302,308 ----
  	if (proc)
  		SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
  	else
! 		SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks));
  }
  
  /*
***************
*** 368,374 **** SyncRepReleaseWaiters(void)
  {
  	volatile WalSndCtlData *walsndctl = WalSndCtl;
  	volatile WalSnd *syncWalSnd = NULL;
! 	int			numprocs = 0;
  	int			priority = 0;
  	int			i;
  
--- 372,380 ----
  {
  	volatile WalSndCtlData *walsndctl = WalSndCtl;
  	volatile WalSnd *syncWalSnd = NULL;
! 	int			numwrite = 0;
! 	int			numflush = 0;
! 	int			numapply = 0;
  	int			priority = 0;
  	int			i;
  
***************
*** 419,440 **** SyncRepReleaseWaiters(void)
  		return;
  	}
  
! 	if (XLByteLT(walsndctl->lsn, MyWalSnd->flush))
  	{
! 		/*
! 		 * Set the lsn first so that when we wake backends they will release
! 		 * up to this location.
! 		 */
! 		walsndctl->lsn = MyWalSnd->flush;
! 		numprocs = SyncRepWakeQueue(false);
  	}
  
  	LWLockRelease(SyncRepLock);
  
! 	elog(DEBUG3, "released %d procs up to %X/%X",
! 		 numprocs,
  		 MyWalSnd->flush.xlogid,
! 		 MyWalSnd->flush.xrecoff);
  
  	/*
  	 * If we are managing the highest priority standby, though we weren't
--- 425,463 ----
  		return;
  	}
  
! 	/*
! 	 * Set the lsn first so that when we wake backends they will release
! 	 * up to this location.
! 	 */
! 	if (XLByteLT(walsndctl->lsn[SYNC_REP_WAIT_WRITE], MyWalSnd->write))
  	{
! 		walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write;
! 		numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
! 	}
! 	if (XLByteLT(walsndctl->lsn[SYNC_REP_WAIT_FLUSH], MyWalSnd->flush))
! 	{
! 		walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
! 		numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
! 	}
! 	if (XLByteLT(walsndctl->lsn[SYNC_REP_WAIT_APPLY], MyWalSnd->apply))
! 	{
! 		walsndctl->lsn[SYNC_REP_WAIT_APPLY] = MyWalSnd->apply;
! 		numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
  	}
  
  	LWLockRelease(SyncRepLock);
  
! 	elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, "
! 		 "%d procs up to apply %X/%X",
! 		 numwrite,
! 		 MyWalSnd->write.xlogid,
! 		 MyWalSnd->write.xrecoff,
! 		 numflush,
  		 MyWalSnd->flush.xlogid,
! 		 MyWalSnd->flush.xrecoff,
! 		 numapply,
! 		 MyWalSnd->apply.xlogid,
! 		 MyWalSnd->apply.xrecoff);
  
  	/*
  	 * If we are managing the highest priority standby, though we weren't
***************
*** 507,530 **** SyncRepGetStandbyPriority(void)
  }
  
  /*
!  * Walk queue from head.  Set the state of any backends that need to be woken,
!  * remove them from the queue, and then wake them.	Pass all = true to wake
!  * whole queue; otherwise, just wake up to the walsender's LSN.
   *
   * Must hold SyncRepLock.
   */
  int
! SyncRepWakeQueue(bool all)
  {
  	volatile WalSndCtlData *walsndctl = WalSndCtl;
  	PGPROC	   *proc = NULL;
  	PGPROC	   *thisproc = NULL;
  	int			numprocs = 0;
  
! 	Assert(SyncRepQueueIsOrderedByLSN());
  
! 	proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
! 								   &(WalSndCtl->SyncRepQueue),
  								   offsetof(PGPROC, syncRepLinks));
  
  	while (proc)
--- 530,555 ----
  }
  
  /*
!  * Walk the specified queue from head.  Set the state of any backends that
!  * need to be woken, remove them from the queue, and then wake them.
!  * Pass all = true to wake whole queue; otherwise, just wake up to
!  * the walsender's LSN.
   *
   * Must hold SyncRepLock.
   */
  int
! SyncRepWakeQueue(bool all, int mode)
  {
  	volatile WalSndCtlData *walsndctl = WalSndCtl;
  	PGPROC	   *proc = NULL;
  	PGPROC	   *thisproc = NULL;
  	int			numprocs = 0;
  
! 	Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
! 	Assert(SyncRepQueueIsOrderedByLSN(mode));
  
! 	proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
! 								   &(WalSndCtl->SyncRepQueue[mode]),
  								   offsetof(PGPROC, syncRepLinks));
  
  	while (proc)
***************
*** 532,538 **** SyncRepWakeQueue(bool all)
  		/*
  		 * Assume the queue is ordered by LSN
  		 */
! 		if (!all && XLByteLT(walsndctl->lsn, proc->waitLSN))
  			return numprocs;
  
  		/*
--- 557,563 ----
  		/*
  		 * Assume the queue is ordered by LSN
  		 */
! 		if (!all && XLByteLT(walsndctl->lsn[mode], proc->waitLSN))
  			return numprocs;
  
  		/*
***************
*** 540,546 **** SyncRepWakeQueue(bool all)
  		 * thisproc is valid, proc may be NULL after this.
  		 */
  		thisproc = proc;
! 		proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
  									   &(proc->syncRepLinks),
  									   offsetof(PGPROC, syncRepLinks));
  
--- 565,571 ----
  		 * thisproc is valid, proc may be NULL after this.
  		 */
  		thisproc = proc;
! 		proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
  									   &(proc->syncRepLinks),
  									   offsetof(PGPROC, syncRepLinks));
  
***************
*** 588,594 **** SyncRepUpdateSyncStandbysDefined(void)
  		 * wants synchronous replication, we'd better wake them up.
  		 */
  		if (!sync_standbys_defined)
! 			SyncRepWakeQueue(true);
  
  		/*
  		 * Only allow people to join the queue when there are synchronous
--- 613,624 ----
  		 * wants synchronous replication, we'd better wake them up.
  		 */
  		if (!sync_standbys_defined)
! 		{
! 			int	i;
! 
! 			for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
! 				SyncRepWakeQueue(true, i);
! 		}
  
  		/*
  		 * Only allow people to join the queue when there are synchronous
***************
*** 605,620 **** SyncRepUpdateSyncStandbysDefined(void)
  
  #ifdef USE_ASSERT_CHECKING
  static bool
! SyncRepQueueIsOrderedByLSN(void)
  {
  	PGPROC	   *proc = NULL;
  	XLogRecPtr	lastLSN;
  
  	lastLSN.xlogid = 0;
  	lastLSN.xrecoff = 0;
  
! 	proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
! 								   &(WalSndCtl->SyncRepQueue),
  								   offsetof(PGPROC, syncRepLinks));
  
  	while (proc)
--- 635,652 ----
  
  #ifdef USE_ASSERT_CHECKING
  static bool
! SyncRepQueueIsOrderedByLSN(int mode)
  {
  	PGPROC	   *proc = NULL;
  	XLogRecPtr	lastLSN;
  
+ 	Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
+ 
  	lastLSN.xlogid = 0;
  	lastLSN.xrecoff = 0;
  
! 	proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
! 								   &(WalSndCtl->SyncRepQueue[mode]),
  								   offsetof(PGPROC, syncRepLinks));
  
  	while (proc)
***************
*** 628,634 **** SyncRepQueueIsOrderedByLSN(void)
  
  		lastLSN = proc->waitLSN;
  
! 		proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
  									   &(proc->syncRepLinks),
  									   offsetof(PGPROC, syncRepLinks));
  	}
--- 660,666 ----
  
  		lastLSN = proc->waitLSN;
  
! 		proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
  									   &(proc->syncRepLinks),
  									   offsetof(PGPROC, syncRepLinks));
  	}
***************
*** 675,677 **** check_synchronous_standby_names(char **newval, void **extra, GucSource source)
--- 707,729 ----
  
  	return true;
  }
+ 
+ void
+ assign_synchronous_commit(int newval, void *extra)
+ {
+ 	switch (newval)
+ 	{
+ 		case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
+ 			SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
+ 			break;
+ 		case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
+ 			SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
+ 			break;
+ 		case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
+ 			SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
+ 			break;
+ 		default:
+ 			SyncRepWaitMode = SYNC_REP_NO_WAIT;
+ 			break;
+ 	}
+ }
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 1410,1416 **** WalSndShmemInit(void)
  		/* First time through, so initialize */
  		MemSet(WalSndCtl, 0, WalSndShmemSize());
  
! 		SHMQueueInit(&(WalSndCtl->SyncRepQueue));
  
  		for (i = 0; i < max_wal_senders; i++)
  		{
--- 1410,1417 ----
  		/* First time through, so initialize */
  		MemSet(WalSndCtl, 0, WalSndShmemSize());
  
! 		for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
! 			SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
  
  		for (i = 0; i < max_wal_senders; i++)
  		{
*** a/src/backend/utils/misc/guc.c
--- b/src/backend/utils/misc/guc.c
***************
*** 370,380 **** static const struct config_enum_entry constraint_exclusion_options[] = {
  };
  
  /*
!  * Although only "on", "off", and "local" are documented, we
   * accept all the likely variants of "on" and "off".
   */
  static const struct config_enum_entry synchronous_commit_options[] = {
  	{"local", SYNCHRONOUS_COMMIT_LOCAL_FLUSH, false},
  	{"on", SYNCHRONOUS_COMMIT_ON, false},
  	{"off", SYNCHRONOUS_COMMIT_OFF, false},
  	{"true", SYNCHRONOUS_COMMIT_ON, true},
--- 370,382 ----
  };
  
  /*
!  * Although only "on", "off", "write", "apply" and "local" are documented, we
   * accept all the likely variants of "on" and "off".
   */
  static const struct config_enum_entry synchronous_commit_options[] = {
  	{"local", SYNCHRONOUS_COMMIT_LOCAL_FLUSH, false},
+ 	{"write", SYNCHRONOUS_COMMIT_REMOTE_WRITE, false},
+ 	{"apply", SYNCHRONOUS_COMMIT_REMOTE_APPLY, false},
  	{"on", SYNCHRONOUS_COMMIT_ON, false},
  	{"off", SYNCHRONOUS_COMMIT_OFF, false},
  	{"true", SYNCHRONOUS_COMMIT_ON, true},
***************
*** 3164,3170 **** static struct config_enum ConfigureNamesEnum[] =
  		},
  		&synchronous_commit,
  		SYNCHRONOUS_COMMIT_ON, synchronous_commit_options,
! 		NULL, NULL, NULL
  	},
  
  	{
--- 3166,3172 ----
  		},
  		&synchronous_commit,
  		SYNCHRONOUS_COMMIT_ON, synchronous_commit_options,
! 		NULL, assign_synchronous_commit, NULL
  	},
  
  	{
*** a/src/include/access/xact.h
--- b/src/include/access/xact.h
***************
*** 55,61 **** typedef enum
  {
  	SYNCHRONOUS_COMMIT_OFF,		/* asynchronous commit */
  	SYNCHRONOUS_COMMIT_LOCAL_FLUSH,		/* wait for local flush only */
! 	SYNCHRONOUS_COMMIT_REMOTE_FLUSH		/* wait for local and remote flush */
  }	SyncCommitLevel;
  
  /* Define the default setting for synchonous_commit */
--- 55,63 ----
  {
  	SYNCHRONOUS_COMMIT_OFF,		/* asynchronous commit */
  	SYNCHRONOUS_COMMIT_LOCAL_FLUSH,		/* wait for local flush only */
! 	SYNCHRONOUS_COMMIT_REMOTE_WRITE,		/* wait for local flush and remote write */
! 	SYNCHRONOUS_COMMIT_REMOTE_FLUSH,		/* wait for local and remote flush */
! 	SYNCHRONOUS_COMMIT_REMOTE_APPLY		/* wait for local flush and remote apply */
  }	SyncCommitLevel;
  
  /* Define the default setting for synchonous_commit */
*** a/src/include/replication/syncrep.h
--- b/src/include/replication/syncrep.h
***************
*** 15,20 ****
--- 15,31 ----
  
  #include "utils/guc.h"
  
+ #define SyncRepRequested() \
+ 	(max_wal_senders > 0 && synchronous_commit > SYNCHRONOUS_COMMIT_LOCAL_FLUSH)
+ 
+ /* SyncRepWaitMode */
+ #define SYNC_REP_NO_WAIT		-1
+ #define SYNC_REP_WAIT_WRITE		0
+ #define SYNC_REP_WAIT_FLUSH		1
+ #define SYNC_REP_WAIT_APPLY		2
+ 
+ #define NUM_SYNC_REP_WAIT_MODE	3
+ 
  /* syncRepState */
  #define SYNC_REP_NOT_WAITING		0
  #define SYNC_REP_WAITING			1
***************
*** 37,44 **** extern void SyncRepReleaseWaiters(void);
  extern void SyncRepUpdateSyncStandbysDefined(void);
  
  /* called by various procs */
! extern int	SyncRepWakeQueue(bool all);
  
  extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source);
  
  #endif   /* _SYNCREP_H */
--- 48,56 ----
  extern void SyncRepUpdateSyncStandbysDefined(void);
  
  /* called by various procs */
! extern int	SyncRepWakeQueue(bool all, int mode);
  
  extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source);
+ extern void assign_synchronous_commit(int newval, void *extra);
  
  #endif   /* _SYNCREP_H */
*** a/src/include/replication/walsender_private.h
--- b/src/include/replication/walsender_private.h
***************
*** 14,19 ****
--- 14,20 ----
  
  #include "access/xlog.h"
  #include "nodes/nodes.h"
+ #include "replication/syncrep.h"
  #include "storage/latch.h"
  #include "storage/shmem.h"
  #include "storage/spin.h"
***************
*** 68,82 **** extern WalSnd *MyWalSnd;
  typedef struct
  {
  	/*
! 	 * Synchronous replication queue. Protected by SyncRepLock.
  	 */
! 	SHM_QUEUE	SyncRepQueue;
  
  	/*
  	 * Current location of the head of the queue. All waiters should have a
  	 * waitLSN that follows this value. Protected by SyncRepLock.
  	 */
! 	XLogRecPtr	lsn;
  
  	/*
  	 * Are any sync standbys defined?  Waiting backends can't reload the
--- 69,84 ----
  typedef struct
  {
  	/*
! 	 * Synchronous replication queue with one queue per request type.
! 	 * Protected by SyncRepLock.
  	 */
! 	SHM_QUEUE	SyncRepQueue[NUM_SYNC_REP_WAIT_MODE];
  
  	/*
  	 * Current location of the head of the queue. All waiters should have a
  	 * waitLSN that follows this value. Protected by SyncRepLock.
  	 */
! 	XLogRecPtr	lsn[NUM_SYNC_REP_WAIT_MODE];
  
  	/*
  	 * Are any sync standbys defined?  Waiting backends can't reload the
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to