On Wed, 2010-05-26 at 16:22 -0700, Josh Berkus wrote:
> > Just this second posted about that, as it turns out.
> > 
> > I have a v3 *almost* ready of the keepalive patch. It still makes sense
> > to me after a few days reflection, so is worth discussion and review. In
> > or out, I want this settled within a week. Definitely need some R&R
> > here.
> 
> Does the keepalive fix all the issues with max_standby_delay?  Tom?

OK, here's v4.

Summary

* WALSender adds a timestamp onto the header of every WAL chunk sent.

* Each WAL record now has a conceptual "send timestamp" that remains
constant while that record is replayed. This is used as the basis from
which max_standby_delay is calculated when required during replay.

* Send timestamp is calculated as the later of the timestamp of chunk in
which WAL record was sent and the latest XLog time.

* WALSender sends an empty message as a keepalive when nothing else to
send. (No longer a special message type for the keepalive).

I think its close, but if there's a gaping hole here somewhere then I'll
punt for this release.

-- 
 Simon Riggs           www.2ndQuadrant.com
*** a/doc/src/sgml/protocol.sgml
--- b/doc/src/sgml/protocol.sgml
***************
*** 4222,4247 **** The commands accepted in walsender mode are:
        </varlistentry>
        <varlistentry>
        <term>
!           Byte<replaceable>n</replaceable>
        </term>
        <listitem>
        <para>
!           Data that forms part of WAL data stream.
        </para>
        </listitem>
        </varlistentry>
!       </variablelist>
        </para>
        </listitem>
        </varlistentry>
        </variablelist>
!      </para>
!      <para>
         A single WAL record is never split across two CopyData messages. When
         a WAL record crosses a WAL page boundary, however, and is therefore
         already split using continuation records, it can be split at the page
         boundary. In other words, the first main WAL record and its
         continuation records can be split across different CopyData messages.
       </para>
      </listitem>
    </varlistentry>
--- 4222,4257 ----
        </varlistentry>
        <varlistentry>
        <term>
!           Byte<replaceable>8</replaceable>
        </term>
        <listitem>
        <para>
!           Message timestamp.
        </para>
        </listitem>
        </varlistentry>
!       <varlistentry>
!       <term>
!           Byte<replaceable>n</replaceable>
!       </term>
!       <listitem>
!       <para>
!           Data that forms part of WAL data stream. (May be zero length).
        </para>
        </listitem>
        </varlistentry>
        </variablelist>
!       </para>
!       <para>
         A single WAL record is never split across two CopyData messages. When
         a WAL record crosses a WAL page boundary, however, and is therefore
         already split using continuation records, it can be split at the page
         boundary. In other words, the first main WAL record and its
         continuation records can be split across different CopyData messages.
+       </para>
+       </listitem>
+       </varlistentry>
+       </variablelist>
       </para>
      </listitem>
    </varlistentry>
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 1938,1944 **** UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force)
  			UpdateControlFile();
  			minRecoveryPoint = newMinRecoveryPoint;
  
! 			ereport(DEBUG2,
  					(errmsg("updated min recovery point to %X/%X",
  						minRecoveryPoint.xlogid, minRecoveryPoint.xrecoff)));
  		}
--- 1938,1944 ----
  			UpdateControlFile();
  			minRecoveryPoint = newMinRecoveryPoint;
  
! 			ereport(DEBUG3,
  					(errmsg("updated min recovery point to %X/%X",
  						minRecoveryPoint.xlogid, minRecoveryPoint.xrecoff)));
  		}
***************
*** 9210,9218 **** retry:
  				{
  					/*
  					 * While walreceiver is active, wait for new WAL to arrive
! 					 * from primary.
  					 */
! 					receivedUpto = GetWalRcvWriteRecPtr();
  					if (XLByteLT(*RecPtr, receivedUpto))
  					{
  						/*
--- 9210,9218 ----
  				{
  					/*
  					 * While walreceiver is active, wait for new WAL to arrive
! 					 * from primary. Get next applychunk and do other bookkeeping.
  					 */
! 					receivedUpto = GetWalRcvNextApplyChunk();
  					if (XLByteLT(*RecPtr, receivedUpto))
  					{
  						/*
*** a/src/backend/replication/walreceiver.c
--- b/src/backend/replication/walreceiver.c
***************
*** 394,410 **** XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
  		case 'w':				/* WAL records */
  			{
  				XLogRecPtr	recptr;
  
! 				if (len < sizeof(XLogRecPtr))
  					ereport(ERROR,
  							(errcode(ERRCODE_PROTOCOL_VIOLATION),
  							 errmsg_internal("invalid WAL message received from primary")));
  
  				memcpy(&recptr, buf, sizeof(XLogRecPtr));
  				buf += sizeof(XLogRecPtr);
  				len -= sizeof(XLogRecPtr);
  
! 				XLogWalRcvWrite(buf, len, recptr);
  				break;
  			}
  		default:
--- 394,427 ----
  		case 'w':				/* WAL records */
  			{
  				XLogRecPtr	recptr;
+ 				TimestampTz chunk_timestamp;
  
! 				if (len < (sizeof(XLogRecPtr) + sizeof(TimestampTz)))
  					ereport(ERROR,
  							(errcode(ERRCODE_PROTOCOL_VIOLATION),
  							 errmsg_internal("invalid WAL message received from primary")));
  
+ 				/*
+ 				 * Extract starting XLogRecPtr of message header
+ 				 */
  				memcpy(&recptr, buf, sizeof(XLogRecPtr));
  				buf += sizeof(XLogRecPtr);
  				len -= sizeof(XLogRecPtr);
  
! 				/*
! 				 * Extract msg timestamp from message header
! 				 */
! 				memcpy(&chunk_timestamp, buf, sizeof(TimestampTz));
! 				buf += sizeof(TimestampTz);
! 				len -= sizeof(TimestampTz);
! 
! 				SetWalRcvChunkTimestamp(chunk_timestamp);
! 
! 				/*
! 				 * If there's any WAL data to process send it through now
! 				 */
! 				if (len > 0)
! 					XLogWalRcvWrite(buf, len, recptr);
  				break;
  			}
  		default:
*** a/src/backend/replication/walreceiverfuncs.c
--- b/src/backend/replication/walreceiverfuncs.c
***************
*** 32,37 ****
--- 32,39 ----
  
  WalRcvData *WalRcv = NULL;
  
+ static TimestampTz lastApplyChunkTimestamp;
+ 
  /*
   * How long to wait for walreceiver to start up after requesting
   * postmaster to launch it. In seconds.
***************
*** 220,222 **** GetWalRcvWriteRecPtr(void)
--- 222,304 ----
  
  	return recptr;
  }
+ 
+ /*
+  * Returns the WAL pointer of the next chunk of records to apply.
+  * Side-effect is that it sets lastApplyChunkTimestamp.
+  *
+  * WALSender, WALReceiver and Startup process all handle chunks of data
+  * referred to here as sendchunks, receivechunks and applychunks
+  * respectively. Each chunk may contain one or more complete WAL records.
+  * The sizes of those chunks differ based upon arrival times, so what one
+  * process regards as a single chunk may be handled as more than one chunk
+  * by a later process, or the other way around. This is important
+  * because disk or network delays might prevent continuous data transfer.
+  *
+  * Each sendchunk from the WALSender has a timestamp on it, which acts as
+  * a keepalive when there are no WAL records waiting to be sent.
+  *
+  * The WALReceiver reads piece by piece, so when Startup process asks for
+  * next applychunk it may read only a partial sendchunk, or it may
+  * also read multiple sendchunks. So we record the oldestSendChunkTimestamp
+  * for each new applychunk requested, so that we can use this as a known
+  * time for use in calculating max_standby_delay.
+  *
+  * Only called by Startup process
+  */
+ XLogRecPtr
+ GetWalRcvNextApplyChunk(void)
+ {
+ 	TimestampTz oldestSendChunkTimestamp;
+ 
+ 	/* use volatile pointer to prevent code rearrangement */
+ 	volatile WalRcvData *walrcv = WalRcv;
+ 	XLogRecPtr	recptr;
+ 
+ 	SpinLockAcquire(&walrcv->mutex);
+ 	recptr = walrcv->receivedUpto;
+ 	oldestSendChunkTimestamp = walrcv->oldestSendChunkTimestamp;
+ 	walrcv->ChunkTimestampIsSet = false;
+ 	SpinLockRelease(&walrcv->mutex);
+ 
+ 	if (oldestSendChunkTimestamp > lastApplyChunkTimestamp)
+ 	{
+ 		lastApplyChunkTimestamp = oldestSendChunkTimestamp;
+ 		elog(DEBUG3, "last applychunk timestamp = %s", timestamptz_to_str(lastApplyChunkTimestamp));
+ 	}
+ 
+ 	return recptr;
+ }
+ 
+ /*
+  * Record oldestSendChunkTimestamp if it isn't set yet. We only reset
+  * it once for each applychunk requested by Startup process, which
+  * gives a fixed value for every WAL record. That's progressively
+  * less accurate, so we correct it later using latestXLogTime if the
+  * variance gets too large.
+  */
+ void
+ SetWalRcvChunkTimestamp(TimestampTz ts)
+ {
+ 	/* use volatile pointer to prevent code rearrangement */
+ 	volatile WalRcvData *walrcv = WalRcv;
+ 
+ 	SpinLockAcquire(&walrcv->mutex);
+ 	if (!walrcv->ChunkTimestampIsSet)
+ 	{
+ 		walrcv->oldestSendChunkTimestamp = ts;
+ 		walrcv->ChunkTimestampIsSet = true;
+ 	}
+ 	SpinLockRelease(&walrcv->mutex);
+ }
+ 
+ /*
+  * Returns the timestamp of the last apply chunk.
+  *
+  * Only called by Startup process
+  */
+ TimestampTz
+ GetLastApplyChunkTimestamp(void)
+ {
+ 	return lastApplyChunkTimestamp;
+ }
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 98,103 **** static void WalSndQuickDieHandler(SIGNAL_ARGS);
--- 98,104 ----
  static int	WalSndLoop(void);
  static void InitWalSnd(void);
  static void WalSndHandshake(void);
+ static bool WalSndKeepAlive(StringInfo outMsg);
  static void WalSndKill(int code, Datum arg);
  static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
  static bool XLogSend(StringInfo outMsg);
***************
*** 314,319 **** WalSndHandshake(void)
--- 315,343 ----
  	}
  }
  
+ static bool
+ WalSndKeepAlive(StringInfo outMsg)
+ {
+ 	TimestampTz ts;
+ 
+ 	ts = GetCurrentTimestamp();
+ 
+ 	/* format the keepalive message */
+ 	pq_sendbyte(outMsg, 'w');
+ 	pq_sendbytes(outMsg, (char *) &sentPtr, sizeof(sentPtr));
+ 	pq_sendbytes(outMsg, (char *) &ts, sizeof(TimestampTz));
+ 
+ 	/* send the CopyData message */
+ 	pq_putmessage('d', outMsg->data, outMsg->len);
+ 
+ 	resetStringInfo(outMsg);
+ 	/* Flush pending output */
+ 	if (pq_flush())
+ 		return false;
+ 
+ 	return true;
+ }
+ 
  /*
   * Check if the remote end has closed the connection.
   */
***************
*** 642,648 **** XLogSend(StringInfo outMsg)
--- 666,683 ----
  
  	/* Quick exit if nothing to do */
  	if (!XLByteLT(sentPtr, SendRqstPtr))
+ 	{
+ 		/*
+ 		 * Send a keepalive if no other work to do.
+ 		 */
+ 		if (XLogStandbyInfoActive())
+ 		{
+ 			if (!WalSndKeepAlive(outMsg))
+ 				return false;
+ 		}
+ 
  		return true;
+ 	}
  
  	/*
  	 * We gather multiple records together by issuing just one XLogRead() of a
***************
*** 654,659 **** XLogSend(StringInfo outMsg)
--- 689,695 ----
  		XLogRecPtr	startptr;
  		XLogRecPtr	endptr;
  		Size		nbytes;
+ 		TimestampTz ts;
  
  		/*
  		 * Figure out how much to send in one message. If there's less than
***************
*** 697,702 **** XLogSend(StringInfo outMsg)
--- 733,741 ----
  		pq_sendbyte(outMsg, 'w');
  		pq_sendbytes(outMsg, (char *) &startptr, sizeof(startptr));
  
+ 		ts = GetCurrentTimestamp();
+ 		pq_sendbytes(outMsg, (char *) &ts, sizeof(TimestampTz));
+ 
  		if (endptr.xlogid != startptr.xlogid)
  		{
  			Assert(endptr.xlogid == startptr.xlogid + 1);
*** a/src/backend/storage/ipc/standby.c
--- b/src/backend/storage/ipc/standby.c
***************
*** 22,27 ****
--- 22,28 ----
  #include "access/xlog.h"
  #include "miscadmin.h"
  #include "pgstat.h"
+ #include "replication/walreceiver.h"
  #include "storage/bufmgr.h"
  #include "storage/lmgr.h"
  #include "storage/proc.h"
***************
*** 34,39 **** int			vacuum_defer_cleanup_age;
--- 35,42 ----
  
  static List *RecoveryLockList;
  
+ static TimestampTz GetXLogRecordSendTimestamp(void);
+ static bool WaitExceedsMaxStandbyDelay(TimestampTz recordSendTimestamp, TimestampTz now);
  static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
  									   ProcSignalReason reason);
  static void ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid);
***************
*** 114,132 **** ShutdownRecoveryTransactionEnvironment(void)
   */
  
  #define STANDBY_INITIAL_WAIT_US  1000
  static int	standbyWait_us = STANDBY_INITIAL_WAIT_US;
  
  /*
   * Standby wait logic for ResolveRecoveryConflictWithVirtualXIDs.
   * We wait here for a while then return. If we decide we can't wait any
   * more then we return true, if we can wait some more return false.
   */
  static bool
! WaitExceedsMaxStandbyDelay(void)
  {
  	/* Are we past max_standby_delay? */
  	if (MaxStandbyDelay >= 0 &&
! 		TimestampDifferenceExceeds(GetLatestXLogTime(), GetCurrentTimestamp(),
  								   MaxStandbyDelay))
  		return true;
  
--- 117,173 ----
   */
  
  #define STANDBY_INITIAL_WAIT_US  1000
+ #define STANDBY_MAX_WAIT_US  100000
  static int	standbyWait_us = STANDBY_INITIAL_WAIT_US;
  
  /*
+  * Each WAL record will have a distinct send timestamp that does not change
+  * while we replay the record.
+  */
+ static TimestampTz
+ GetXLogRecordSendTimestamp(void)
+ {
+ 	TimestampTz lastChunkTimestamp = GetLastApplyChunkTimestamp();
+ 	TimestampTz lastXLogTimestamp = GetLatestXLogTime();
+ 
+ 	/*
+ 	 * Use the later of two sources of time information. lastXLogTimestamp
+ 	 * will only be later than lastChunkTimestamp if we aren't streaming
+ 	 * or many chunks of streamed data are backlogged and we are trying to
+ 	 * catch up again.
+ 	 *
+ 	 * Note that send timestamp is an approximation only. If we
+ 	 * are streaming smoothly the record send timestamp will be accurate
+ 	 * to within wal_sender_delay on the primary. If we are backlogged as
+ 	 * can happen in the face of many conflicts then the value will only
+ 	 * be accurate at the start of the next apply chunk. If the apply
+ 	 * chunk contains many receivechunks then the data will appear to have
+ 	 * all arrived at the earlier time. So we switch to using the latest
+ 	 * known xlog time if that is later.
+ 	 *
+ 	 * This happens because we store only one send timestamp for any
+ 	 * applychunk, since it would be difficult to store more than one
+ 	 * timestamp. It would be easier if timestamps arrived continuously
+ 	 * even when real data isn't flowing though that would then need to
+ 	 * be archived as well, so we want to avoid that.
+ 	 */
+ 	if (lastXLogTimestamp > lastChunkTimestamp)
+ 		return lastXLogTimestamp;
+ 	else
+ 		return lastChunkTimestamp;
+ }
+ 
+ /*
   * Standby wait logic for ResolveRecoveryConflictWithVirtualXIDs.
   * We wait here for a while then return. If we decide we can't wait any
   * more then we return true, if we can wait some more return false.
   */
  static bool
! WaitExceedsMaxStandbyDelay(TimestampTz recordSendTimestamp, TimestampTz now)
  {
  	/* Are we past max_standby_delay? */
  	if (MaxStandbyDelay >= 0 &&
! 		TimestampDifferenceExceeds(recordSendTimestamp, now,
  								   MaxStandbyDelay))
  		return true;
  
***************
*** 140,147 **** WaitExceedsMaxStandbyDelay(void)
  	 * since pg_usleep isn't interruptable on some platforms.
  	 */
  	standbyWait_us *= 2;
! 	if (standbyWait_us > 1000000)
! 		standbyWait_us = 1000000;
  
  	return false;
  }
--- 181,188 ----
  	 * since pg_usleep isn't interruptable on some platforms.
  	 */
  	standbyWait_us *= 2;
! 	if (standbyWait_us > STANDBY_MAX_WAIT_US)
! 		standbyWait_us = STANDBY_MAX_WAIT_US;
  
  	return false;
  }
***************
*** 160,165 **** ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
--- 201,210 ----
  	{
  		TimestampTz waitStart;
  		char	   *new_status;
+ 		TimestampTz recordSendTimestamp = 0;
+ 
+ 		if (MaxStandbyDelay >= 0)
+ 			recordSendTimestamp = GetXLogRecordSendTimestamp();
  
  		pgstat_report_waiting(true);
  
***************
*** 172,183 **** ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
  		/* wait until the virtual xid is gone */
  		while (!ConditionalVirtualXactLockTableWait(*waitlist))
  		{
  			/*
  			 * Report via ps if we have been waiting for more than 500 msec
  			 * (should that be configurable?)
  			 */
  			if (update_process_title && new_status == NULL &&
! 				TimestampDifferenceExceeds(waitStart, GetCurrentTimestamp(),
  										   500))
  			{
  				const char *old_status;
--- 217,230 ----
  		/* wait until the virtual xid is gone */
  		while (!ConditionalVirtualXactLockTableWait(*waitlist))
  		{
+ 			TimestampTz now = GetCurrentTimestamp();
+ 
  			/*
  			 * Report via ps if we have been waiting for more than 500 msec
  			 * (should that be configurable?)
  			 */
  			if (update_process_title && new_status == NULL &&
! 				TimestampDifferenceExceeds(waitStart, now,
  										   500))
  			{
  				const char *old_status;
***************
*** 194,200 **** ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
  			}
  
  			/* Is it time to kill it? */
! 			if (WaitExceedsMaxStandbyDelay())
  			{
  				pid_t		pid;
  
--- 241,247 ----
  			}
  
  			/* Is it time to kill it? */
! 			if (WaitExceedsMaxStandbyDelay(recordSendTimestamp, now))
  			{
  				pid_t		pid;
  
***************
*** 400,406 **** ResolveRecoveryConflictWithBufferPin(void)
  	}
  	else
  	{
! 		TimestampTz then = GetLatestXLogTime();
  		TimestampTz now = GetCurrentTimestamp();
  
  		/* Are we past max_standby_delay? */
--- 447,453 ----
  	}
  	else
  	{
! 		TimestampTz then = GetXLogRecordSendTimestamp();
  		TimestampTz now = GetCurrentTimestamp();
  
  		/* Are we past max_standby_delay? */
*** a/src/include/replication/walreceiver.h
--- b/src/include/replication/walreceiver.h
***************
*** 59,64 **** typedef struct
--- 59,70 ----
  	 */
  	XLogRecPtr	receivedUpto;
  
+ 	/*
+ 	 * The timestamp of the oldest sendchunk timestamp.
+ 	 */
+ 	TimestampTz oldestSendChunkTimestamp;
+ 	bool 		ChunkTimestampIsSet;
+ 
  	slock_t		mutex;			/* locks shared variables shown above */
  } WalRcvData;
  
***************
*** 83,87 **** extern bool WalRcvInProgress(void);
--- 89,96 ----
  extern XLogRecPtr WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished);
  extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo);
  extern XLogRecPtr GetWalRcvWriteRecPtr(void);
+ extern XLogRecPtr GetWalRcvNextApplyChunk(void);
+ extern void SetWalRcvChunkTimestamp(TimestampTz ts);
+ extern TimestampTz GetLastApplyChunkTimestamp(void);
  
  #endif   /* _WALRECEIVER_H */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to