Patch adds a keepalive message to ensure max_standby_delay is useful.

No WAL format changes, no libpq changes. Just an additional message type
for the streaming replication protocol, sent once per main loop in
WALsender. Plus docs.

Comments?

-- 
 Simon Riggs           www.2ndQuadrant.com
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index c63d003..391d990 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -4232,16 +4232,52 @@ The commands accepted in walsender mode are:
       </varlistentry>
       </variablelist>
       </para>
-      </listitem>
-      </varlistentry>
-      </variablelist>
-     </para>
-     <para>
+      <para>
        A single WAL record is never split across two CopyData messages. When
        a WAL record crosses a WAL page boundary, however, and is therefore
        already split using continuation records, it can be split at the page
        boundary. In other words, the first main WAL record and its
        continuation records can be split across different CopyData messages.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Keepalive (B)
+      </term>
+      <listitem>
+      <para>
+      <variablelist>
+      <varlistentry>
+      <term>
+          Byte1('k')
+      </term>
+      <listitem>
+      <para>
+          Identifies the message as a keepalive.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          TimestampTz
+      </term>
+      <listitem>
+      <para>
+          The current timestamp on the primary server when the keepalive was sent.
+      </para>
+      </listitem>
+      </varlistentry>
+      </variablelist>
+      </para>
+      <para>
+       If <varname>wal_level</> is set to <literal>hot_standby</> then a keepalive
+       is sent once per <varname>wal_sender_delay</>. The keepalive is sent after
+       WAL data has been sent, if any.
+      </para>
+      </listitem>
+      </varlistentry>
+      </variablelist>
      </para>
     </listitem>
   </varlistentry>
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 607d57e..ee383af 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -5566,6 +5566,17 @@ GetLatestXLogTime(void)
 	return recoveryLastXTime;
 }
 
+void
+SetLatestXLogTime(TimestampTz newLastXTime)
+{
+	/* use volatile pointer to prevent code rearrangement */
+	volatile XLogCtlData *xlogctl = XLogCtl;
+
+	SpinLockAcquire(&xlogctl->info_lck);
+	xlogctl->recoveryLastXTime = newLastXTime;
+	SpinLockRelease(&xlogctl->info_lck);
+}
+
 /*
  * Note that text field supplied is a parameter name and does not require
  * translation
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index bb87a06..8d52c3f 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -407,6 +407,22 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
 				XLogWalRcvWrite(buf, len, recptr);
 				break;
 			}
+		case 'k':				/* keepalive */
+			{
+				TimestampTz keepalive;
+
+				if (len != sizeof(TimestampTz))
+					ereport(ERROR,
+							(errcode(ERRCODE_PROTOCOL_VIOLATION),
+							 errmsg_internal("invalid keepalive message received from primary")));
+
+				memcpy(&keepalive, buf, sizeof(TimestampTz));
+				buf += sizeof(TimestampTz);
+				len -= sizeof(TimestampTz);
+
+				SetLatestXLogTime(keepalive);
+				break;
+			}
 		default:
 			ereport(ERROR,
 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 1c04fc3..f2f8750 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -98,6 +98,7 @@ static void WalSndQuickDieHandler(SIGNAL_ARGS);
 static int	WalSndLoop(void);
 static void InitWalSnd(void);
 static void WalSndHandshake(void);
+static bool WalSndKeepAlive(void);
 static void WalSndKill(int code, Datum arg);
 static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
 static bool XLogSend(StringInfo outMsg);
@@ -314,6 +315,30 @@ WalSndHandshake(void)
 	}
 }
 
+static bool
+WalSndKeepAlive(void)
+{
+	StringInfoData outMsg;
+	TimestampTz ts;
+
+	if (!XLogStandbyInfoActive())
+		return true;
+
+	initStringInfo(&outMsg);
+	ts = GetCurrentTimestamp();
+
+	/* format the keepalive message */
+	pq_sendbyte(&outMsg, 'k');
+	pq_sendbytes(&outMsg, (char *) &ts, sizeof(TimestampTz));
+
+	/* send the CopyData message */
+	pq_putmessage('d', outMsg.data, outMsg.len);
+	if (pq_flush())
+		return false;
+
+	return true;
+}
+
 /*
  * Check if the remote end has closed the connection.
  */
@@ -428,6 +453,9 @@ WalSndLoop(void)
 		/* Attempt to send the log once every loop */
 		if (!XLogSend(&output_message))
 			goto eof;
+
+		if (!WalSndKeepAlive())
+			goto eof;
 	}
 
 	/* can't get here because the above loop never exits */
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 8ff68a6..2d01670 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -284,6 +284,7 @@ extern void issue_xlog_fsync(int fd, uint32 log, uint32 seg);
 extern bool RecoveryInProgress(void);
 extern bool XLogInsertAllowed(void);
 extern TimestampTz GetLatestXLogTime(void);
+extern void SetLatestXLogTime(TimestampTz newLatestXLogTime);
 
 extern void UpdateControlFile(void);
 extern uint64 GetSystemIdentifier(void);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to