On Sat, 2010-05-15 at 19:50 +0100, Simon Riggs wrote:
> On Sat, 2010-05-15 at 18:24 +0100, Simon Riggs wrote:
> 
> > I will recode using that concept.

> Startup gets new pointer when it runs out of data to replay. That might
> or might not include an updated keepalive timestamp, since there's no
> exact relationship between chunks sent and chunks received. Startup
> might ask for a new chunk when half a chunk has been received, or when
> multiple chunks have been received.

New version, with some other cleanup of wait processing.

New logic is that when Startup asks for next applychunk of WAL it saves
the lastChunkTimestamp. That is then the base time used by
WaitExceedsMaxStandbyDelay(), except when latestXLogTime is later.
Since multiple receivechunks can arrive from primary before Startup asks
for next applychunk we use the oldest receivechunk timestamp, not the
latest. Doing it this way means the lastChunkTimestamp doesn't change
when new keepalives arrive, so we have a stable and reasonably accurate
recordSendTimestamp for each WAL record.

The keepalive is sent as the first part of a new message, if any. So
partial chunks of data always have an accurate timestamp, even if that
is slightly older as a result. Doesn't make much difference except with
very large chunks.

I think that addresses the points raised on this thread and others.

-- 
 Simon Riggs           www.2ndQuadrant.com
*** a/doc/src/sgml/protocol.sgml
--- b/doc/src/sgml/protocol.sgml
***************
*** 4232,4247 **** The commands accepted in walsender mode are:
        </varlistentry>
        </variablelist>
        </para>
!       </listitem>
!       </varlistentry>
!       </variablelist>
!      </para>
!      <para>
         A single WAL record is never split across two CopyData messages. When
         a WAL record crosses a WAL page boundary, however, and is therefore
         already split using continuation records, it can be split at the page
         boundary. In other words, the first main WAL record and its
         continuation records can be split across different CopyData messages.
       </para>
      </listitem>
    </varlistentry>
--- 4232,4283 ----
        </varlistentry>
        </variablelist>
        </para>
!       <para>
         A single WAL record is never split across two CopyData messages. When
         a WAL record crosses a WAL page boundary, however, and is therefore
         already split using continuation records, it can be split at the page
         boundary. In other words, the first main WAL record and its
         continuation records can be split across different CopyData messages.
+       </para>
+       </listitem>
+       </varlistentry>
+       <varlistentry>
+       <term>
+           Keepalive (B)
+       </term>
+       <listitem>
+       <para>
+       <variablelist>
+       <varlistentry>
+       <term>
+           Byte1('k')
+       </term>
+       <listitem>
+       <para>
+           Identifies the message as a keepalive.
+       </para>
+       </listitem>
+       </varlistentry>
+       <varlistentry>
+       <term>
+           TimestampTz
+       </term>
+       <listitem>
+       <para>
+           The current timestamp on the primary server when the keepalive was sent.
+       </para>
+       </listitem>
+       </varlistentry>
+       </variablelist>
+       </para>
+       <para>
+        If <varname>wal_level</> is set to <literal>hot_standby</> then a keepalive
+        is sent once per <varname>wal_sender_delay</>. The keepalive is sent after
+        WAL data has been sent, if any.
+       </para>
+       </listitem>
+       </varlistentry>
+       </variablelist>
       </para>
      </listitem>
    </varlistentry>
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 1938,1944 **** UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force)
  			UpdateControlFile();
  			minRecoveryPoint = newMinRecoveryPoint;
  
! 			ereport(DEBUG2,
  					(errmsg("updated min recovery point to %X/%X",
  						minRecoveryPoint.xlogid, minRecoveryPoint.xrecoff)));
  		}
--- 1938,1944 ----
  			UpdateControlFile();
  			minRecoveryPoint = newMinRecoveryPoint;
  
! 			ereport(DEBUG3,
  					(errmsg("updated min recovery point to %X/%X",
  						minRecoveryPoint.xlogid, minRecoveryPoint.xrecoff)));
  		}
***************
*** 9212,9218 **** retry:
  					 * While walreceiver is active, wait for new WAL to arrive
  					 * from primary.
  					 */
! 					receivedUpto = GetWalRcvWriteRecPtr();
  					if (XLByteLT(*RecPtr, receivedUpto))
  					{
  						/*
--- 9212,9218 ----
  					 * While walreceiver is active, wait for new WAL to arrive
  					 * from primary.
  					 */
! 					receivedUpto = GetWalRcvNextChunk();
  					if (XLByteLT(*RecPtr, receivedUpto))
  					{
  						/*
*** a/src/backend/replication/walreceiver.c
--- b/src/backend/replication/walreceiver.c
***************
*** 407,412 **** XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
--- 407,428 ----
  				XLogWalRcvWrite(buf, len, recptr);
  				break;
  			}
+ 		case 'k':				/* keepalive */
+ 			{
+ 				TimestampTz keepalive;
+ 
+ 				if (len != sizeof(TimestampTz))
+ 					ereport(ERROR,
+ 							(errcode(ERRCODE_PROTOCOL_VIOLATION),
+ 							 errmsg_internal("invalid keepalive message received from primary")));
+ 
+ 				memcpy(&keepalive, buf, sizeof(TimestampTz));
+ 				buf += sizeof(TimestampTz);
+ 				len -= sizeof(TimestampTz);
+ 
+ 				SetWalRcvChunkTimestamp(keepalive);
+ 				break;
+ 			}
  		default:
  			ereport(ERROR,
  					(errcode(ERRCODE_PROTOCOL_VIOLATION),
*** a/src/backend/replication/walreceiverfuncs.c
--- b/src/backend/replication/walreceiverfuncs.c
***************
*** 32,37 ****
--- 32,39 ----
  
  WalRcvData *WalRcv = NULL;
  
+ static TimestampTz lastChunkTimestamp;
+ 
  /*
   * How long to wait for walreceiver to start up after requesting
   * postmaster to launch it. In seconds.
***************
*** 220,222 **** GetWalRcvWriteRecPtr(void)
--- 222,287 ----
  
  	return recptr;
  }
+ 
+ /*
+  * Returns the WAL pointer of the next chunk of records to apply.
+  * Side-effect is that it sets lastChunkTimestamp.
+  *
+  * Each sendchunk from the WALSender has a keepalive timestamp on it. The
+  * WALReceiver reads piece by piece, so when Startup process asks for next
+  * applychunk to work on it may read only a partial sendchunk, or it may
+  * also read multiple sendchunks. So we record the lastChunktimestamp for
+  * each new apply chunk requested, so that we can use this as a known
+  * time for use in calculating max_standby_delay.
+  *
+  * Only called by Startup process
+  */
+ XLogRecPtr
+ GetWalRcvNextChunk(void)
+ {
+ 	TimestampTz oldestChunkTimestamp;
+ 
+ 	/* use volatile pointer to prevent code rearrangement */
+ 	volatile WalRcvData *walrcv = WalRcv;
+ 	XLogRecPtr	recptr;
+ 
+ 	SpinLockAcquire(&walrcv->mutex);
+ 	recptr = walrcv->receivedUpto;
+ 	oldestChunkTimestamp = walrcv->oldestChunkTimestamp;
+ 	walrcv->ChunkTimestampIsSet = false;
+ 	SpinLockRelease(&walrcv->mutex);
+ 
+ 	if (oldestChunkTimestamp > lastChunkTimestamp)
+ 	{
+ 		lastChunkTimestamp = oldestChunkTimestamp;
+ 		elog(DEBUG3, "last chunk timestamp = %s", timestamptz_to_str(lastChunkTimestamp));
+ 	}
+ 
+ 	return recptr;
+ }
+ 
+ void
+ SetWalRcvChunkTimestamp(TimestampTz ts)
+ {
+ 	/* use volatile pointer to prevent code rearrangement */
+ 	volatile WalRcvData *walrcv = WalRcv;
+ 
+ 	SpinLockAcquire(&walrcv->mutex);
+ 	if (!walrcv->ChunkTimestampIsSet)
+ 	{
+ 		walrcv->oldestChunkTimestamp = ts;
+ 		walrcv->ChunkTimestampIsSet = true;
+ 	}
+ 	SpinLockRelease(&walrcv->mutex);
+ }
+ 
+ /*
+  * Returns the timestamp of the last chunk.
+  *
+  * Only called by Startup process
+  */
+ TimestampTz
+ GetWalRcvLastChunkTimestamp(void)
+ {
+ 	return lastChunkTimestamp;
+ }
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 98,103 **** static void WalSndQuickDieHandler(SIGNAL_ARGS);
--- 98,104 ----
  static int	WalSndLoop(void);
  static void InitWalSnd(void);
  static void WalSndHandshake(void);
+ static void WalSndKeepAlive(void);
  static void WalSndKill(int code, Datum arg);
  static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
  static bool XLogSend(StringInfo outMsg);
***************
*** 314,319 **** WalSndHandshake(void)
--- 315,342 ----
  	}
  }
  
+ static void
+ WalSndKeepAlive(void)
+ {
+ 	StringInfoData outMsg;
+ 	TimestampTz ts;
+ 
+ 	if (!XLogStandbyInfoActive())
+ 		return;
+ 
+ 	initStringInfo(&outMsg);
+ 	ts = GetCurrentTimestamp();
+ 
+ 	/* format the keepalive message */
+ 	pq_sendbyte(&outMsg, 'k');
+ 	pq_sendbytes(&outMsg, (char *) &ts, sizeof(TimestampTz));
+ 
+ 	/* send the CopyData message */
+ 	pq_putmessage('d', outMsg.data, outMsg.len);
+ 
+ 	/* relies on pq_flush elsewhere */
+ }
+ 
  /*
   * Check if the remote end has closed the connection.
   */
***************
*** 637,648 **** XLogSend(StringInfo outMsg)
--- 660,679 ----
  	/* use volatile pointer to prevent code rearrangement */
  	volatile WalSnd *walsnd = MyWalSnd;
  
+ 	WalSndKeepAlive();
+ 
  	/* Attempt to send all records flushed to the disk already */
  	SendRqstPtr = GetWriteRecPtr();
  
  	/* Quick exit if nothing to do */
  	if (!XLByteLT(sentPtr, SendRqstPtr))
+ 	{
+ 		/* Flush pending output */
+ 		if (pq_flush())
+ 			return false;
+ 
  		return true;
+ 	}
  
  	/*
  	 * We gather multiple records together by issuing just one XLogRead() of a
*** a/src/backend/storage/ipc/standby.c
--- b/src/backend/storage/ipc/standby.c
***************
*** 22,27 ****
--- 22,28 ----
  #include "access/xlog.h"
  #include "miscadmin.h"
  #include "pgstat.h"
+ #include "replication/walreceiver.h"
  #include "storage/bufmgr.h"
  #include "storage/lmgr.h"
  #include "storage/proc.h"
***************
*** 34,39 **** int			vacuum_defer_cleanup_age;
--- 35,41 ----
  
  static List *RecoveryLockList;
  
+ static bool WaitExceedsMaxStandbyDelay(TimestampTz recordSendTimestamp, TimestampTz now);
  static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
  									   ProcSignalReason reason);
  static void ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid);
***************
*** 114,119 **** ShutdownRecoveryTransactionEnvironment(void)
--- 116,122 ----
   */
  
  #define STANDBY_INITIAL_WAIT_US  1000
+ #define STANDBY_MAX_WAIT_US  100000
  static int	standbyWait_us = STANDBY_INITIAL_WAIT_US;
  
  /*
***************
*** 122,132 **** static int	standbyWait_us = STANDBY_INITIAL_WAIT_US;
   * more then we return true, if we can wait some more return false.
   */
  static bool
! WaitExceedsMaxStandbyDelay(void)
  {
  	/* Are we past max_standby_delay? */
  	if (MaxStandbyDelay >= 0 &&
! 		TimestampDifferenceExceeds(GetLatestXLogTime(), GetCurrentTimestamp(),
  								   MaxStandbyDelay))
  		return true;
  
--- 125,135 ----
   * more then we return true, if we can wait some more return false.
   */
  static bool
! WaitExceedsMaxStandbyDelay(TimestampTz recordSendTimestamp, TimestampTz now)
  {
  	/* Are we past max_standby_delay? */
  	if (MaxStandbyDelay >= 0 &&
! 		TimestampDifferenceExceeds(recordSendTimestamp, now,
  								   MaxStandbyDelay))
  		return true;
  
***************
*** 140,147 **** WaitExceedsMaxStandbyDelay(void)
  	 * since pg_usleep isn't interruptable on some platforms.
  	 */
  	standbyWait_us *= 2;
! 	if (standbyWait_us > 1000000)
! 		standbyWait_us = 1000000;
  
  	return false;
  }
--- 143,150 ----
  	 * since pg_usleep isn't interruptable on some platforms.
  	 */
  	standbyWait_us *= 2;
! 	if (standbyWait_us > STANDBY_MAX_WAIT_US)
! 		standbyWait_us = STANDBY_MAX_WAIT_US;
  
  	return false;
  }
***************
*** 160,165 **** ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
--- 163,169 ----
  	{
  		TimestampTz waitStart;
  		char	   *new_status;
+ 		TimestampTz recordSendTimestamp = 0;
  
  		pgstat_report_waiting(true);
  
***************
*** 169,183 **** ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
  		/* reset standbyWait_us for each xact we wait for */
  		standbyWait_us = STANDBY_INITIAL_WAIT_US;
  
  		/* wait until the virtual xid is gone */
  		while (!ConditionalVirtualXactLockTableWait(*waitlist))
  		{
  			/*
  			 * Report via ps if we have been waiting for more than 500 msec
  			 * (should that be configurable?)
  			 */
  			if (update_process_title && new_status == NULL &&
! 				TimestampDifferenceExceeds(waitStart, GetCurrentTimestamp(),
  										   500))
  			{
  				const char *old_status;
--- 173,221 ----
  		/* reset standbyWait_us for each xact we wait for */
  		standbyWait_us = STANDBY_INITIAL_WAIT_US;
  
+ 		if (MaxStandbyDelay >= 0)
+ 		{
+ 			TimestampTz lastChunkTimestamp = GetWalRcvLastChunkTimestamp();
+ 			TimestampTz lastXLogTimestamp = GetLatestXLogTime();
+ 			long diff_secs;
+ 			int diff_usecs;
+ 
+ 			/*
+ 			 * Use the later of two sources of time information. lastXLogTimestamp
+ 			 * will only be later than lastChunkTimestamp if we aren't streaming
+ 			 * or many chunks of streamed data are backlogged and we are trying to
+ 			 * catch up again.
+ 			 *
+ 			 * Note that recordSendTimestamp is an approximation only. If we
+ 			 * are streaming smoothly the recordSendTimestamp will be accurate
+ 			 * to within wal_sender_delay on the primary. If we are backlogged as
+ 			 * can happen in the face of many conflicts then the value will only
+ 			 * be accurate at the start of the next apply chunk. If the apply
+ 			 * chunk contains many receivechunks then the data will appear to have
+ 			 * all arrived at the earlier time. So we switch to using the latest
+ 			 * known xlog time if that is later.
+ 			 */
+ 			if (lastXLogTimestamp > lastChunkTimestamp)
+ 				recordSendTimestamp = lastXLogTimestamp;
+ 			else
+ 				recordSendTimestamp = lastChunkTimestamp;
+ 
+ 			TimestampDifference(recordSendTimestamp, waitStart, &diff_secs, &diff_usecs);
+ 			elog(DEBUG2, "conflict when standby delay = %ld.%u s  max_standby_delay = %u ms",
+ 						diff_secs, diff_usecs, MaxStandbyDelay);
+ 		}
+ 
  		/* wait until the virtual xid is gone */
  		while (!ConditionalVirtualXactLockTableWait(*waitlist))
  		{
+ 			TimestampTz now = GetCurrentTimestamp();
+ 
  			/*
  			 * Report via ps if we have been waiting for more than 500 msec
  			 * (should that be configurable?)
  			 */
  			if (update_process_title && new_status == NULL &&
! 				TimestampDifferenceExceeds(waitStart, now,
  										   500))
  			{
  				const char *old_status;
***************
*** 194,200 **** ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
  			}
  
  			/* Is it time to kill it? */
! 			if (WaitExceedsMaxStandbyDelay())
  			{
  				pid_t		pid;
  
--- 232,238 ----
  			}
  
  			/* Is it time to kill it? */
! 			if (WaitExceedsMaxStandbyDelay(recordSendTimestamp, now))
  			{
  				pid_t		pid;
  
*** a/src/include/replication/walreceiver.h
--- b/src/include/replication/walreceiver.h
***************
*** 59,64 **** typedef struct
--- 59,70 ----
  	 */
  	XLogRecPtr	receivedUpto;
  
+ 	/*
+ 	 * The timestamp of the oldest keepalive.
+ 	 */
+ 	TimestampTz oldestChunkTimestamp;
+ 	bool 		ChunkTimestampIsSet;
+ 
  	slock_t		mutex;			/* locks shared variables shown above */
  } WalRcvData;
  
***************
*** 83,87 **** extern bool WalRcvInProgress(void);
--- 89,96 ----
  extern XLogRecPtr WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished);
  extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo);
  extern XLogRecPtr GetWalRcvWriteRecPtr(void);
+ extern XLogRecPtr GetWalRcvNextChunk(void);
+ extern void SetWalRcvChunkTimestamp(TimestampTz ts);
+ extern TimestampTz GetWalRcvLastChunkTimestamp(void);
  
  #endif   /* _WALRECEIVER_H */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to