On Mon, Dec 12, 2011 at 12:17 PM, Simon Riggs <si...@2ndquadrant.com> wrote:
> On Sat, Dec 10, 2011 at 12:29 PM, Greg Smith <g...@2ndquadrant.com> wrote:
>
>> "We can send regular special messages from WALSender to WALReceiver that do
>> not form part of the WAL stream, so we don't bulk
>> up WAL archives. (i.e. don't use "w" messages)."
>>
>> Here's my understanding of how this would work.
>
> Let me explain a little more and provide a very partial patch.
>
> We define a new replication protocol message 'k' which sends a
> keepalive from primary to standby when there is no WAL to send. The
> message does not form part of the WAL stream so does not bloat WAL
> files, nor cause them to fill when unattended.
>
> Keepalives contain current end of WAL and a current timestamp.
>
> Keepalive processing is all done on the standby and there is no
> overhead on a primary which does not use replication. There is a
> slight overhead on primary for keepalives but this happens only when
> there are no writes. On the standby we already update shared state
> when we receive some data, so not much else to do there.
>
> When the standby has applied up to the end of WAL the replication
> delay is receipt time - send time of keepalive.

Patch introduces regular keepalives from WALsender to WALreceiver,
using a new protocol message 'k'.
These are sent at intervals of a fraction of replication_delay or 10s
if not set,
whenever no WAL records have been sent recently.

Patch exposes info used for standby_delay calculation as used in 9.0+
In addition introduces direct calculations of replication apply delay
and replication transfer latency, both in ms.

-- 
 Simon Riggs                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index d6332e5..71c40cc 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1467,6 +1467,54 @@ The commands accepted in walsender mode are:
       <variablelist>
       <varlistentry>
       <term>
+          Primary keepalive message (B)
+      </term>
+      <listitem>
+      <para>
+      <variablelist>
+      <varlistentry>
+      <term>
+          Byte1('k')
+      </term>
+      <listitem>
+      <para>
+          Identifies the message as a sender keepalive.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Byte8
+      </term>
+      <listitem>
+      <para>
+          The current end of WAL on the server, given in
+          XLogRecPtr format.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Byte8
+      </term>
+      <listitem>
+      <para>
+          The server's system clock at the time of transmission,
+          given in TimestampTz format.
+      </para>
+      </listitem>
+      </varlistentry>
+      </variablelist>
+      </para>
+      </listitem>
+      </varlistentry>
+      </variablelist>
+     </para>
+
+     <para>
+      <variablelist>
+      <varlistentry>
+      <term>
           Standby status update (F)
       </term>
       <listitem>
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 9d96044..77e2760 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -452,6 +452,9 @@ typedef struct XLogCtlData
 	XLogRecPtr	recoveryLastRecPtr;
 	/* timestamp of last COMMIT/ABORT record replayed (or being replayed) */
 	TimestampTz recoveryLastXTime;
+	/* timestamp of when we started replaying the current chunk of WAL data,
+	 * only relevant for replication or archive recovery */
+	TimestampTz currentChunkStartTime;
 	/* end of the last record restored from the archive */
 	XLogRecPtr	restoreLastRecPtr;
 	/* Are we requested to pause recovery? */
@@ -606,6 +609,7 @@ static void exitArchiveRecovery(TimeLineID endTLI,
 static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
 static void recoveryPausesHere(void);
 static void SetLatestXTime(TimestampTz xtime);
+static void SetCurrentChunkStartTime(TimestampTz xtime);
 static void CheckRequiredParameterValues(void);
 static void XLogReportParameters(void);
 static void LocalSetXLogInsertAllowed(void);
@@ -5846,6 +5850,41 @@ GetLatestXTime(void)
 }
 
 /*
+ * Save timestamp of the next chunk of WAL records to apply.
+ *
+ * We keep this in XLogCtl, not a simple static variable, so that it can be
+ * seen by all backends.
+ */
+static void
+SetCurrentChunkStartTime(TimestampTz xtime)
+{
+	/* use volatile pointer to prevent code rearrangement */
+	volatile XLogCtlData *xlogctl = XLogCtl;
+
+	SpinLockAcquire(&xlogctl->info_lck);
+	xlogctl->currentChunkStartTime = xtime;
+	SpinLockRelease(&xlogctl->info_lck);
+}
+
+/*
+ * Fetch timestamp of latest processed commit/abort record.
+ * Startup process maintains an accurate local copy in XLogReceiptTime
+ */
+TimestampTz
+GetCurrentChunkReplayStartTime(void)
+{
+	/* use volatile pointer to prevent code rearrangement */
+	volatile XLogCtlData *xlogctl = XLogCtl;
+	TimestampTz xtime;
+
+	SpinLockAcquire(&xlogctl->info_lck);
+	xtime = xlogctl->currentChunkStartTime;
+	SpinLockRelease(&xlogctl->info_lck);
+
+	return xtime;
+}
+
+/*
  * Returns time of receipt of current chunk of XLOG data, as well as
  * whether it was received from streaming replication or from archives.
  */
@@ -6388,6 +6427,7 @@ StartupXLOG(void)
 		xlogctl->replayEndRecPtr = ReadRecPtr;
 		xlogctl->recoveryLastRecPtr = ReadRecPtr;
 		xlogctl->recoveryLastXTime = 0;
+		xlogctl->currentChunkStartTime = 0;
 		xlogctl->recoveryPause = false;
 		SpinLockRelease(&xlogctl->info_lck);
 
@@ -9688,7 +9728,10 @@ retry:
 						{
 							havedata = true;
 							if (!XLByteLT(*RecPtr, latestChunkStart))
+							{
 								XLogReceiptTime = GetCurrentTimestamp();
+								SetCurrentChunkStartTime(XLogReceiptTime);
+							}
 						}
 						else
 							havedata = false;
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 1f12dcb..8106d6b 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -124,6 +124,7 @@ static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
 static void XLogWalRcvFlush(bool dying);
 static void XLogWalRcvSendReply(void);
 static void XLogWalRcvSendHSFeedback(void);
+static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
 
 /* Signal handlers */
 static void WalRcvSigHupHandler(SIGNAL_ARGS);
@@ -218,6 +219,10 @@ WalReceiverMain(void)
 	/* Fetch information required to start streaming */
 	strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
 	startpoint = walrcv->receiveStart;
+
+	/* Initialise to a sanish value */
+	walrcv->lastMsgSendTime = walrcv->lastMsgReceiptTime = GetCurrentTimestamp();
+
 	SpinLockRelease(&walrcv->mutex);
 
 	/* Arrange to clean up at walreceiver exit */
@@ -433,12 +438,28 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
 							 errmsg_internal("invalid WAL message received from primary")));
 				/* memcpy is required here for alignment reasons */
 				memcpy(&msghdr, buf, sizeof(WalDataMessageHeader));
+
+				ProcessWalSndrMessage(msghdr.walEnd, msghdr.sendTime);
+
 				buf += sizeof(WalDataMessageHeader);
 				len -= sizeof(WalDataMessageHeader);
-
 				XLogWalRcvWrite(buf, len, msghdr.dataStart);
 				break;
 			}
+		case 'k':				/* Keepalive */
+			{
+				PrimaryKeepaliveMessage keepalive;
+
+				if (len != sizeof(PrimaryKeepaliveMessage))
+					ereport(ERROR,
+							(errcode(ERRCODE_PROTOCOL_VIOLATION),
+							 errmsg_internal("invalid keepalive message received from primary")));
+				/* memcpy is required here for alignment reasons */
+				memcpy(&keepalive, buf, sizeof(PrimaryKeepaliveMessage));
+
+				ProcessWalSndrMessage(keepalive.walEnd, keepalive.sendTime);
+				break;
+			}
 		default:
 			ereport(ERROR,
 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -711,3 +732,27 @@ XLogWalRcvSendHSFeedback(void)
 	memcpy(&buf[1], &feedback_message, sizeof(StandbyHSFeedbackMessage));
 	walrcv_send(buf, sizeof(StandbyHSFeedbackMessage) + 1);
 }
+
+/*
+ * Keep track of important messages from primary.
+ */
+static void
+ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
+{
+	/* use volatile pointer to prevent code rearrangement */
+	volatile WalRcvData *walrcv = WalRcv;
+
+	TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
+
+	/* Update shared-memory status */
+	SpinLockAcquire(&walrcv->mutex);
+	walrcv->lastMsgSendTime = sendTime;
+	walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
+	SpinLockRelease(&walrcv->mutex);
+
+	elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d transfer latency %d",
+					timestamptz_to_str(sendTime),
+					timestamptz_to_str(lastMsgReceiptTime),
+					GetReplicationApplyDelay(),
+					GetReplicationTransferLatency());
+}
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 5bce1c3..054355b 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -28,6 +28,7 @@
 #include "replication/walreceiver.h"
 #include "storage/pmsignal.h"
 #include "storage/shmem.h"
+#include "utils/timestamp.h"
 
 WalRcvData *WalRcv = NULL;
 
@@ -238,3 +239,65 @@ GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart)
 
 	return recptr;
 }
+
+/*
+ * Returns the replication apply delay in ms
+ */
+int
+GetReplicationApplyDelay(void)
+{
+	/* use volatile pointer to prevent code rearrangement */
+	volatile WalRcvData *walrcv = WalRcv;
+
+	XLogRecPtr	receivePtr;
+	XLogRecPtr	replayPtr;
+
+	long	secs;
+	int		usecs;
+
+	SpinLockAcquire(&walrcv->mutex);
+	receivePtr = walrcv->receivedUpto;
+	SpinLockRelease(&walrcv->mutex);
+
+	replayPtr = GetXLogReplayRecPtr(NULL);
+
+	if (XLByteLE(receivePtr, replayPtr))
+		return 0;
+
+	TimestampDifference(GetCurrentChunkReplayStartTime(),
+						GetCurrentTimestamp(),
+						&secs, &usecs);
+
+	return (((int) secs * 1000) + (usecs / 1000));
+}
+
+/*
+ * Returns the network latency in ms, note that this includes any
+ * difference in clock settings between the servers, as well as timezone.
+ */
+int
+GetReplicationTransferLatency(void)
+{
+	/* use volatile pointer to prevent code rearrangement */
+	volatile WalRcvData *walrcv = WalRcv;
+
+	TimestampTz lastMsgSendTime;
+	TimestampTz lastMsgReceiptTime;
+
+	long	secs = 0;
+	int		usecs = 0;
+	int		ms;
+
+	SpinLockAcquire(&walrcv->mutex);
+	lastMsgSendTime = walrcv->lastMsgSendTime;
+	lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
+	SpinLockRelease(&walrcv->mutex);
+
+	TimestampDifference(lastMsgSendTime,
+						lastMsgReceiptTime,
+						&secs, &usecs);
+
+	ms = ((int) secs * 1000) + (usecs / 1000);
+
+	return ms;
+}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index ea86520..ed7298b 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -131,6 +131,7 @@ static void ProcessStandbyMessage(void);
 static void ProcessStandbyReplyMessage(void);
 static void ProcessStandbyHSFeedbackMessage(void);
 static void ProcessRepliesIfAny(void);
+static void WalSndKeepalive(char *msgbuf);
 
 
 /* Main entry point for walsender process */
@@ -823,30 +824,24 @@ WalSndLoop(void)
 		 */
 		if (caughtup || pq_is_send_pending())
 		{
-			TimestampTz finish_time = 0;
-			long		sleeptime = -1;
+			TimestampTz timeout = 0;
+			long		sleeptime = 10000; /* 10 s */
 			int			wakeEvents;
 
 			wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
-				WL_SOCKET_READABLE;
+				WL_SOCKET_READABLE | WL_TIMEOUT;
+
 			if (pq_is_send_pending())
 				wakeEvents |= WL_SOCKET_WRITEABLE;
+			else
+				WalSndKeepalive(output_message);
 
 			/* Determine time until replication timeout */
 			if (replication_timeout > 0)
 			{
-				long		secs;
-				int			usecs;
-
-				finish_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
+				timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
 														  replication_timeout);
-				TimestampDifference(GetCurrentTimestamp(),
-									finish_time, &secs, &usecs);
-				sleeptime = secs * 1000 + usecs / 1000;
-				/* Avoid Assert in WaitLatchOrSocket if timeout is past */
-				if (sleeptime < 0)
-					sleeptime = 0;
-				wakeEvents |= WL_TIMEOUT;
+				sleeptime = 1 + (replication_timeout / 10);
 			}
 
 			/* Sleep until something happens or replication timeout */
@@ -859,7 +854,7 @@ WalSndLoop(void)
 			 * timeout ... he's supposed to reply *before* that.
 			 */
 			if (replication_timeout > 0 &&
-				GetCurrentTimestamp() >= finish_time)
+				GetCurrentTimestamp() >= timeout)
 			{
 				/*
 				 * Since typically expiration of replication timeout means
@@ -1627,6 +1622,23 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 	return (Datum) 0;
 }
 
+static void
+WalSndKeepalive(char *msgbuf)
+{
+	PrimaryKeepaliveMessage keepalive_message;
+
+	/* Construct a new message */
+	keepalive_message.walEnd = sentPtr;
+	keepalive_message.sendTime = GetCurrentTimestamp();
+
+	elog(DEBUG2, "sending replication keepalive");
+
+	/* Prepend with the message type and send it. */
+	msgbuf[0] = 'k';
+	memcpy(msgbuf + 1, &keepalive_message, sizeof(PrimaryKeepaliveMessage));
+	pq_putmessage_noblock('d', msgbuf, sizeof(PrimaryKeepaliveMessage) + 1);
+}
+
 /*
  * This isn't currently used for anything. Monitoring tools might be
  * interested in the future, and we'll need something like this in the
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 1b31414..5a21017 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -301,6 +301,7 @@ extern XLogRecPtr GetXLogWriteRecPtr(void);
 extern bool RecoveryIsPaused(void);
 extern void SetRecoveryPause(bool recoveryPause);
 extern TimestampTz GetLatestXTime(void);
+extern TimestampTz GetCurrentChunkReplayStartTime(void);
 
 extern void UpdateControlFile(void);
 extern uint64 GetSystemIdentifier(void);
diff --git a/src/include/replication/walprotocol.h b/src/include/replication/walprotocol.h
index 656c8fc..053376d 100644
--- a/src/include/replication/walprotocol.h
+++ b/src/include/replication/walprotocol.h
@@ -17,6 +17,20 @@
 
 
 /*
+ * All messages from WalSender must contain these fields to allow us to
+ * correctly calculate the replication delay.
+ */
+typedef struct
+{
+	/* Current end of WAL on the sender */
+	XLogRecPtr	walEnd;
+
+	/* Sender's system clock at the time of transmission */
+	TimestampTz sendTime;
+} WalSndrMessage;
+
+
+/*
  * Header for a WAL data message (message type 'w').  This is wrapped within
  * a CopyData message at the FE/BE protocol level.
  *
@@ -40,6 +54,14 @@ typedef struct
 } WalDataMessageHeader;
 
 /*
+ * Keepalive message from primary (message type 'k'). (lowercase k)
+ * This is wrapped within a CopyData message at the FE/BE protocol level.
+ *
+ * Note that the data length is not specified here.
+ */
+typedef WalSndrMessage	PrimaryKeepaliveMessage;
+
+/*
  * Reply message from standby (message type 'r').  This is wrapped within
  * a CopyData message at the FE/BE protocol level.
  *
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 77f5252..926730c 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -79,6 +79,12 @@ typedef struct
 	XLogRecPtr	latestChunkStart;
 
 	/*
+	 * Time of send and receive of any message received.
+	 */
+	TimestampTz lastMsgSendTime;
+	TimestampTz lastMsgReceiptTime;
+
+	/*
 	 * connection string; is used for walreceiver to connect with the primary.
 	 */
 	char		conninfo[MAXCONNINFO];
@@ -112,5 +118,7 @@ extern void ShutdownWalRcv(void);
 extern bool WalRcvInProgress(void);
 extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo);
 extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart);
+extern int GetReplicationApplyDelay(void);
+extern int GetReplicationTransferLatency(void);
 
 #endif   /* _WALRECEIVER_H */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to