diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 3b614b6..36e00e0 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -3282,6 +3282,26 @@ include_dir 'conf.d'
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-relay-lag-sample-interval" xreflabel="replay_lag_sample_interval">
+      <term><varname>replay_lag_sample_interval</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>replay_lag_sample_interval</> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Controls how often a standby should sample replay lag information to
+        send back to the primary or upstream standby while replaying WAL.  The
+        default is 1 second.  Units are milliseconds if not specified.  A
+        value of -1 disables the reporting of replay lag.  Estimated replay lag
+        can be seen in the <link linkend="monitoring-stats-views-table">
+        <literal>pg_stat_replication</></link> view of the upstream server.
+        This parameter can only be set
+        in the <filename>postgresql.conf</> file or on the server command line.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-hot-standby-feedback" xreflabel="hot_standby_feedback">
       <term><varname>hot_standby_feedback</varname> (<type>boolean</type>)
       <indexterm>
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 5b58d2e..79d2d95 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1401,6 +1401,12 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
       standby server</entry>
     </row>
     <row>
+     <entry><structfield>replay_lag</></entry>
+     <entry><type>interval</></entry>
+     <entry>Estimated time taken for recent WAL records to be replayed on this
+      standby server</entry>
+    </row>
+    <row>
      <entry><structfield>sync_priority</></entry>
      <entry><type>integer</></entry>
      <entry>Priority of this standby server for being chosen as the
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index aa9ee5a..739e45e 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -82,6 +82,8 @@ extern uint32 bootstrap_data_checksum_version;
 #define PROMOTE_SIGNAL_FILE		"promote"
 #define FALLBACK_PROMOTE_SIGNAL_FILE "fallback_promote"
 
+/* Size of the circular buffer of timestamped LSNs. */
+#define XLOG_TIMESTAMP_BUFFER_SIZE 8192
 
 /* User-settable parameters */
 int			max_wal_size = 64;	/* 1 GB */
@@ -520,6 +522,26 @@ typedef struct XLogCtlInsert
 } XLogCtlInsert;
 
 /*
+ * A sample associating a timestamp with a given xlog position.
+ */
+typedef struct XLogTimestamp
+{
+	TimestampTz	timestamp;
+	XLogRecPtr	lsn;
+} XLogTimestamp;
+
+/*
+ * A circular buffer of LSNs and associated timestamps.  The buffer is empty
+ * when read_head == write_head.
+ */
+typedef struct XLogTimestampBuffer
+{
+	uint32			read_head;
+	uint32			write_head;
+	XLogTimestamp	buffer[XLOG_TIMESTAMP_BUFFER_SIZE];
+} XLogTimestampBuffer;
+
+/*
  * Total shared-memory state for XLOG.
  */
 typedef struct XLogCtlData
@@ -637,6 +659,12 @@ typedef struct XLogCtlData
 	/* timestamp of last COMMIT/ABORT record replayed (or being replayed) */
 	TimestampTz recoveryLastXTime;
 
+	/* timestamp from the most recently applied record associated with a timestamp. */
+	TimestampTz lastReplayedTimestamp;
+
+	/* a buffer of upstream timestamps for WAL that is not yet applied. */
+	XLogTimestampBuffer timestamps;
+
 	/*
 	 * timestamp of when we started replaying the current chunk of WAL data,
 	 * only relevant for replication or archive recovery
@@ -5976,6 +6004,44 @@ CheckRequiredParameterValues(void)
 }
 
 /*
+ * Called by the startup process after it has replayed up to 'lsn'.  Checks
+ * for timestamps associated with WAL positions that have now been replayed.
+ * If any are found, the latest such timestamp found is written to
+ * '*timestamp'.  Returns the new buffer read head position, which the caller
+ * should write into XLogCtl->timestamps.read_head while holding info_lck.
+ */
+static uint32
+CheckForReplayedTimestamps(XLogRecPtr lsn, TimestampTz *timestamp)
+{
+	uint32 read_head;
+
+	/*
+	 * It's OK to access timestamps.read_head without any kind synchronization
+	 * because this process is the only one to write to it.
+	 */
+	Assert(AmStartupProcess());
+	read_head = XLogCtl->timestamps.read_head;
+
+	/*
+	 * It's OK to access write_head without interlocking because it's an
+	 * aligned 32 bit value which we can read atomically on all supported
+	 * platforms to get some recent value, not a torn/garbage value.
+	 * Furthermore we must see a value that is at least as recent as any WAL
+	 * that we have replayed, because walreceiver calls
+	 * SetXLogReplayTimestampAtLsn before passing the corresponding WAL data
+	 * to the recovery process.
+	 */
+	while (read_head != XLogCtl->timestamps.write_head &&
+		   XLogCtl->timestamps.buffer[read_head].lsn <= lsn)
+	{
+		*timestamp = XLogCtl->timestamps.buffer[read_head].timestamp;
+		read_head = (read_head + 1) % XLOG_TIMESTAMP_BUFFER_SIZE;
+	}
+
+	return read_head;
+}
+
+/*
  * This must be called ONCE during postmaster or standalone-backend startup
  */
 void
@@ -6794,6 +6860,8 @@ StartupXLOG(void)
 			do
 			{
 				bool		switchedTLI = false;
+				TimestampTz	replayed_timestamp = 0;
+				uint32		timestamp_read_head;
 
 #ifdef WAL_DEBUG
 				if (XLOG_DEBUG ||
@@ -6947,24 +7015,34 @@ StartupXLOG(void)
 				/* Pop the error context stack */
 				error_context_stack = errcallback.previous;
 
+				/* Check if we have replayed a timestamped WAL position */
+				timestamp_read_head =
+					CheckForReplayedTimestamps(EndRecPtr, &replayed_timestamp);
+
 				/*
-				 * Update lastReplayedEndRecPtr after this record has been
-				 * successfully replayed.
+				 * Update lastReplayedEndRecPtr and lastReplayedTimestamp
+				 * after this record has been successfully replayed.
 				 */
 				SpinLockAcquire(&XLogCtl->info_lck);
 				XLogCtl->lastReplayedEndRecPtr = EndRecPtr;
 				XLogCtl->lastReplayedTLI = ThisTimeLineID;
+				XLogCtl->timestamps.read_head = timestamp_read_head;
+				if (replayed_timestamp != 0)
+					XLogCtl->lastReplayedTimestamp = replayed_timestamp;
 				SpinLockRelease(&XLogCtl->info_lck);
 
 				/*
 				 * If rm_redo called XLogRequestWalReceiverReply, then we wake
 				 * up the receiver so that it notices the updated
-				 * lastReplayedEndRecPtr and sends a reply to the master.
+				 * lastReplayedEndRecPtr and sends a reply to the master.  We
+				 * also wake it if we have replayed a WAL position that has
+				 * an associated timestamp so that the upstream server can
+				 * measure our replay lag.
 				 */
-				if (doRequestWalReceiverReply)
+				if (doRequestWalReceiverReply || replayed_timestamp != 0)
 				{
 					doRequestWalReceiverReply = false;
-					WalRcvForceReply();
+					WalRcvForceReply(replayed_timestamp != 0);
 				}
 
 				/* Remember this record as the last-applied one */
@@ -11745,3 +11823,81 @@ XLogRequestWalReceiverReply(void)
 {
 	doRequestWalReceiverReply = true;
 }
+
+/*
+ * Record the timestamp that is associated with a WAL position.
+ *
+ * This is called by walreceiver on standby servers when new messages arrive,
+ * using a timestamp and the latest known WAL position from the upstream
+ * server.  The timestamp will be sent back to the upstream server via
+ * walreceiver when the recovery process has applied the WAL position.  The
+ * upstream server can then compute the elapsed time to estimate the replay
+ * lag.
+ */
+void
+SetXLogReplayTimestampAtLsn(TimestampTz timestamp, XLogRecPtr lsn)
+{
+	Assert(AmWalReceiverProcess());
+
+	SpinLockAcquire(&XLogCtl->info_lck);
+	if (lsn == XLogCtl->lastReplayedEndRecPtr)
+	{
+		/*
+		 * That is the last replayed LSN: we are fully replayed, so we can
+		 * update the replay timestamp immediately.
+		 */
+		XLogCtl->lastReplayedTimestamp = timestamp;
+	}
+	else
+	{
+		/*
+		 * There is WAL still to be applied.  We will associate the timestamp
+		 * with this WAL position and wait for it to be replayed.  We add it
+		 * at the 'write' end of the circular buffer of LSN/timestamp
+		 * mappings, which the replay loop will eventually read.
+		 */
+		uint32 write_head = XLogCtl->timestamps.write_head;
+		uint32 new_write_head = (write_head + 1) % XLOG_TIMESTAMP_BUFFER_SIZE;
+
+		if (new_write_head == XLogCtl->timestamps.read_head)
+		{
+			/*
+			 * The buffer is full, so we'll rewind and overwrite the most
+			 * recent sample.  Overwriting the most recent sample means that
+			 * if we're not replaying fast enough and the buffer fills up,
+			 * we'll effectively lower the sampling rate.
+			 */
+			new_write_head = write_head;
+			write_head = (write_head - 1) % XLOG_TIMESTAMP_BUFFER_SIZE;
+		}
+
+		XLogCtl->timestamps.buffer[write_head].lsn = lsn;
+		XLogCtl->timestamps.buffer[write_head].timestamp = timestamp;
+		XLogCtl->timestamps.write_head = new_write_head;
+	}
+	SpinLockRelease(&XLogCtl->info_lck);
+}
+
+/*
+ * Get the timestamp for the most recently applied WAL record that carried a
+ * timestamp from the upstream server, and also the most recently applied LSN.
+ * (Note that the timestamp and the LSN don't necessarily relate to the same
+ * record.)
+ *
+ * This is similar to GetLatestXTime, except that it is advanced when WAL
+ * positions recorded with SetXLogReplayTimestampAtLsn have been applied,
+ * rather than commit records.
+ */
+TimestampTz
+GetXLogReplayTimestamp(XLogRecPtr *lsn)
+{
+	TimestampTz result;
+
+	SpinLockAcquire(&XLogCtl->info_lck);
+	if (lsn)
+		*lsn = XLogCtl->lastReplayedEndRecPtr;
+	result = XLogCtl->lastReplayedTimestamp;
+	SpinLockRelease(&XLogCtl->info_lck);
+
+	return result;
+}
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 48e7c4b..e0e45fa 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -685,6 +685,7 @@ CREATE VIEW pg_stat_replication AS
             W.write_location,
             W.flush_location,
             W.replay_location,
+            W.replay_lag,
             W.sync_priority,
             W.sync_state
     FROM pg_stat_get_activity(NULL) AS S
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index cc3cf7d..9cf9f4c 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -73,6 +73,7 @@
 int			wal_receiver_status_interval;
 int			wal_receiver_timeout;
 bool		hot_standby_feedback;
+int			replay_lag_sample_interval;
 
 /* libpqwalreceiver connection */
 static WalReceiverConn *wrconn = NULL;
@@ -138,7 +139,7 @@ static void WalRcvDie(int code, Datum arg);
 static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
 static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
 static void XLogWalRcvFlush(bool dying);
-static void XLogWalRcvSendReply(bool force, bool requestReply);
+static void XLogWalRcvSendReply(bool force, bool requestReply, bool includeApplyTimestamp);
 static void XLogWalRcvSendHSFeedback(bool immed);
 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
 
@@ -456,7 +457,7 @@ WalReceiverMain(void)
 					}
 
 					/* Let the master know that we received some data. */
-					XLogWalRcvSendReply(false, false);
+					XLogWalRcvSendReply(false, false, false);
 
 					/*
 					 * If we've written some records, flush them to disk and
@@ -493,6 +494,8 @@ WalReceiverMain(void)
 					ResetLatch(walrcv->latch);
 					if (walrcv->force_reply)
 					{
+						bool timestamp = walrcv->force_reply_apply_timestamp;
+
 						/*
 						 * The recovery process has asked us to send apply
 						 * feedback now.  Make sure the flag is really set to
@@ -500,8 +503,9 @@ WalReceiverMain(void)
 						 * we don't miss a new request for a reply.
 						 */
 						walrcv->force_reply = false;
+						walrcv->force_reply_apply_timestamp = false;
 						pg_memory_barrier();
-						XLogWalRcvSendReply(true, false);
+						XLogWalRcvSendReply(true, false, timestamp);
 					}
 				}
 				if (rc & WL_POSTMASTER_DEATH)
@@ -559,7 +563,7 @@ WalReceiverMain(void)
 						}
 					}
 
-					XLogWalRcvSendReply(requestReply, requestReply);
+					XLogWalRcvSendReply(requestReply, requestReply, false);
 					XLogWalRcvSendHSFeedback(false);
 				}
 			}
@@ -911,7 +915,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
 
 				/* If the primary requested a reply, send one immediately */
 				if (replyRequested)
-					XLogWalRcvSendReply(true, false);
+					XLogWalRcvSendReply(true, false, false);
 				break;
 			}
 		default:
@@ -1074,7 +1078,7 @@ XLogWalRcvFlush(bool dying)
 		/* Also let the master know that we made some progress */
 		if (!dying)
 		{
-			XLogWalRcvSendReply(false, false);
+			XLogWalRcvSendReply(false, false, false);
 			XLogWalRcvSendHSFeedback(false);
 		}
 	}
@@ -1092,15 +1096,18 @@ XLogWalRcvFlush(bool dying)
  * If 'requestReply' is true, requests the server to reply immediately upon
  * receiving this message. This is used for heartbearts, when approaching
  * wal_receiver_timeout.
+ *
+ * If 'reportApplyTimestamp' is true, the latest apply timestamp is included.
  */
 static void
-XLogWalRcvSendReply(bool force, bool requestReply)
+XLogWalRcvSendReply(bool force, bool requestReply, bool reportApplyTimestamp)
 {
 	static XLogRecPtr writePtr = 0;
 	static XLogRecPtr flushPtr = 0;
 	XLogRecPtr	applyPtr;
 	static TimestampTz sendTime = 0;
 	TimestampTz now;
+	TimestampTz applyTimestamp = 0;
 
 	/*
 	 * If the user doesn't want status to be reported to the master, be sure
@@ -1132,7 +1139,35 @@ XLogWalRcvSendReply(bool force, bool requestReply)
 	/* Construct a new message */
 	writePtr = LogstreamResult.Write;
 	flushPtr = LogstreamResult.Flush;
-	applyPtr = GetXLogReplayRecPtr(NULL);
+	applyTimestamp = GetXLogReplayTimestamp(&applyPtr);
+
+	/* Decide whether to send an apply timestamp for replay lag estimation. */
+	if (replay_lag_sample_interval != -1)
+	{
+		static TimestampTz lastTimestampSendTime = 0;
+
+		/*
+		 * Only send an apply timestamp if we were explicitly asked to by the
+		 * recovery process or if replay lag sampling is active but the
+		 * recovery process seems to be stuck.
+		 *
+		 * If we haven't heard from the recovery process in a time exceeding
+		 * wal_receiver_status_interval and yet it has not applied the highest
+		 * LSN we've heard about, then we want to resend the last replayed
+		 * timestamp we have; otherwise we zero it out and wait for the
+		 * recovery process to wake us when it has set a new accurate replay
+		 * timestamp.  Note that we can read latestWalEnd without acquiring the
+		 * mutex that protects it because it is only written to by this
+		 * process (walreceiver).
+		 */
+		if (reportApplyTimestamp ||
+			(WalRcv->latestWalEnd > applyPtr &&
+			 TimestampDifferenceExceeds(lastTimestampSendTime, now,
+										wal_receiver_status_interval * 1000)))
+			lastTimestampSendTime = now;
+		else
+			applyTimestamp = 0;
+	}
 
 	resetStringInfo(&reply_message);
 	pq_sendbyte(&reply_message, 'r');
@@ -1140,6 +1175,7 @@ XLogWalRcvSendReply(bool force, bool requestReply)
 	pq_sendint64(&reply_message, flushPtr);
 	pq_sendint64(&reply_message, applyPtr);
 	pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
+	pq_sendint64(&reply_message, TimestampTzToIntegerTimestamp(applyTimestamp));
 	pq_sendbyte(&reply_message, requestReply ? 1 : 0);
 
 	/* Send it */
@@ -1244,18 +1280,40 @@ static void
 ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
 {
 	WalRcvData *walrcv = WalRcv;
-
 	TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
+	bool newHighWalEnd = false;
+
+	static TimestampTz lastRecordedTimestamp = 0;
 
 	/* Update shared-memory status */
 	SpinLockAcquire(&walrcv->mutex);
 	if (walrcv->latestWalEnd < walEnd)
+	{
 		walrcv->latestWalEndTime = sendTime;
+		newHighWalEnd = true;
+	}
 	walrcv->latestWalEnd = walEnd;
 	walrcv->lastMsgSendTime = sendTime;
 	walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
 	SpinLockRelease(&walrcv->mutex);
 
+	/*
+	 * If replay lag sampling is active, remember the upstream server's
+	 * timestamp at the latest WAL end that it has, unless we've already
+	 * done that too recently or the LSN hasn't advanced.  This timestamp
+	 * will be fed back to us by the startup process when it eventually
+	 * replays this LSN, so that we can feed it back to the upstream server
+	 * for replay lag tracking purposes.
+	 */
+	if (replay_lag_sample_interval != -1 &&
+		newHighWalEnd &&
+		sendTime > TimestampTzPlusMilliseconds(lastRecordedTimestamp,
+											   replay_lag_sample_interval))
+	{
+		SetXLogReplayTimestampAtLsn(sendTime, walEnd);
+		lastRecordedTimestamp = sendTime;
+	}
+
 	if (log_min_messages <= DEBUG2)
 	{
 		char	   *sendtime;
@@ -1291,12 +1349,14 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
  * This is called by the startup process whenever interesting xlog records
  * are applied, so that walreceiver can check if it needs to send an apply
  * notification back to the master which may be waiting in a COMMIT with
- * synchronous_commit = remote_apply.
+ * synchronous_commit = remote_apply.  Also used to send periodic messages
+ * which are used to compute pg_stat_replication.replay_lag.
  */
 void
-WalRcvForceReply(void)
+WalRcvForceReply(bool apply_timestamp)
 {
 	WalRcv->force_reply = true;
+	WalRcv->force_reply_apply_timestamp = apply_timestamp;
 	if (WalRcv->latch)
 		SetLatch(WalRcv->latch);
 }
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index d80bcc0..0782d78 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1553,15 +1553,29 @@ ProcessStandbyReplyMessage(void)
 	XLogRecPtr	writePtr,
 				flushPtr,
 				applyPtr;
+	int64		applyLagUs;
 	bool		replyRequested;
+	TimestampTz now = GetCurrentTimestamp();
+	TimestampTz applyTimestamp;
 
 	/* the caller already consumed the msgtype byte */
 	writePtr = pq_getmsgint64(&reply_message);
 	flushPtr = pq_getmsgint64(&reply_message);
 	applyPtr = pq_getmsgint64(&reply_message);
 	(void) pq_getmsgint64(&reply_message);		/* sendTime; not used ATM */
+	applyTimestamp = IntegerTimestampToTimestampTz(pq_getmsgint64(&reply_message));
 	replyRequested = pq_getmsgbyte(&reply_message);
 
+	/* Compute the apply lag in milliseconds. */
+	if (applyTimestamp == 0)
+		applyLagUs = -1;
+	else
+#ifdef HAVE_INT64_TIMESTAMP
+		applyLagUs = now - applyTimestamp;
+#else
+		applyLagUs = (now - applyTimestamp) * 1000000;
+#endif
+
 	elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
 		 (uint32) (writePtr >> 32), (uint32) writePtr,
 		 (uint32) (flushPtr >> 32), (uint32) flushPtr,
@@ -1583,6 +1597,8 @@ ProcessStandbyReplyMessage(void)
 		walsnd->write = writePtr;
 		walsnd->flush = flushPtr;
 		walsnd->apply = applyPtr;
+		if (applyLagUs >= 0)
+			walsnd->applyLagUs = applyLagUs;
 		SpinLockRelease(&walsnd->mutex);
 	}
 
@@ -1979,6 +1995,7 @@ InitWalSenderSlot(void)
 			walsnd->write = InvalidXLogRecPtr;
 			walsnd->flush = InvalidXLogRecPtr;
 			walsnd->apply = InvalidXLogRecPtr;
+			walsnd->applyLagUs = -1;
 			walsnd->state = WALSNDSTATE_STARTUP;
 			walsnd->latch = &MyProc->procLatch;
 			SpinLockRelease(&walsnd->mutex);
@@ -2761,7 +2778,7 @@ WalSndGetStateString(WalSndState state)
 Datum
 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_WAL_SENDERS_COLS	8
+#define PG_STAT_GET_WAL_SENDERS_COLS	9
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
 	Tuplestorestate *tupstore;
@@ -2809,6 +2826,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 		XLogRecPtr	write;
 		XLogRecPtr	flush;
 		XLogRecPtr	apply;
+		int64		applyLagUs;
 		int			priority;
 		WalSndState state;
 		Datum		values[PG_STAT_GET_WAL_SENDERS_COLS];
@@ -2823,6 +2841,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 		write = walsnd->write;
 		flush = walsnd->flush;
 		apply = walsnd->apply;
+		applyLagUs = walsnd->applyLagUs;
 		priority = walsnd->sync_standby_priority;
 		SpinLockRelease(&walsnd->mutex);
 
@@ -2857,6 +2876,23 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 				nulls[5] = true;
 			values[5] = LSNGetDatum(apply);
 
+			if (applyLagUs < 0)
+				nulls[6] = true;
+			else
+			{
+				Interval *applyLagInterval = palloc(sizeof(Interval));
+
+				applyLagInterval->month = 0;
+				applyLagInterval->day = 0;
+#ifdef HAVE_INT64_TIMESTAMP
+				applyLagInterval->time = applyLagUs;
+#else
+				applyLagInterval->time = applyLagUs / 1000000.0;
+#endif
+				nulls[6] = false;
+				values[6] = IntervalPGetDatum(applyLagInterval);
+			}
+
 			/*
 			 * Treat a standby such as a pg_basebackup background process
 			 * which always returns an invalid flush location, as an
@@ -2864,18 +2900,18 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 			 */
 			priority = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : priority;
 
-			values[6] = Int32GetDatum(priority);
+			values[7] = Int32GetDatum(priority);
 
 			/*
 			 * More easily understood version of standby state. This is purely
 			 * informational, not different from priority.
 			 */
 			if (priority == 0)
-				values[7] = CStringGetTextDatum("async");
+				values[8] = CStringGetTextDatum("async");
 			else if (list_member_int(sync_standbys, i))
-				values[7] = CStringGetTextDatum("sync");
+				values[8] = CStringGetTextDatum("sync");
 			else
-				values[7] = CStringGetTextDatum("potential");
+				values[8] = CStringGetTextDatum("potential");
 		}
 
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c
index c1d6f05..323d640 100644
--- a/src/backend/utils/adt/timestamp.c
+++ b/src/backend/utils/adt/timestamp.c
@@ -1724,6 +1724,20 @@ GetSQLLocalTimestamp(int32 typmod)
 }
 
 /*
+ * TimestampTzToIntegerTimestamp -- convert a native timestamp to int64 format
+ *
+ * When compiled with --enable-integer-datetimes, this is implemented as a
+ * no-op macro.
+ */
+#ifndef HAVE_INT64_TIMESTAMP
+int64
+TimestampTzToIntegerTimestamp(TimestampTz timestamp)
+{
+	return timestamp * 1000000;
+}
+#endif
+
+/*
  * TimestampDifference -- convert the difference between two timestamps
  *		into integer seconds and microseconds
  *
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index a025117..b1af028 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1810,6 +1810,17 @@ static struct config_int ConfigureNamesInt[] =
 	},
 
 	{
+		{"replay_lag_sample_interval", PGC_SIGHUP, REPLICATION_STANDBY,
+			gettext_noop("Sets the minimum time between WAL timestamp samples used to estimate replay lag."),
+			NULL,
+			GUC_UNIT_MS
+		},
+		&replay_lag_sample_interval,
+		1 * 1000, -1, INT_MAX / 1000,
+		NULL, NULL, NULL
+	},
+
+	{
 		{"wal_receiver_timeout", PGC_SIGHUP, REPLICATION_STANDBY,
 			gettext_noop("Sets the maximum wait time to receive data from the primary."),
 			NULL,
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 7f9acfd..bf298f2 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -270,6 +270,8 @@
 					# in milliseconds; 0 disables
 #wal_retrieve_retry_interval = 5s	# time to wait before retrying to
 					# retrieve WAL after a failed attempt
+#replay_lag_sample_interval = 1s	# min time between timestamps recorded
+					# to estimate replay lag; -1 disables replay lag sampling
 
 
 #------------------------------------------------------------------------------
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index cb5f989..9753648 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -111,7 +111,7 @@ sendFeedback(PGconn *conn, int64 now, bool force, bool replyRequested)
 	static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
 	static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
 
-	char		replybuf[1 + 8 + 8 + 8 + 8 + 1];
+	char		replybuf[1 + 8 + 8 + 8 + 8 + 8 + 1];
 	int			len = 0;
 
 	/*
@@ -142,6 +142,8 @@ sendFeedback(PGconn *conn, int64 now, bool force, bool replyRequested)
 	len += 8;
 	fe_sendint64(now, &replybuf[len]);	/* sendTime */
 	len += 8;
+	fe_sendint64(0, &replybuf[len]);	/* applyTimestamp */
+	len += 8;
 	replybuf[len] = replyRequested ? 1 : 0;		/* replyRequested */
 	len += 1;
 
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index 4382e5d..8e89627 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -321,7 +321,7 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
 static bool
 sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested)
 {
-	char		replybuf[1 + 8 + 8 + 8 + 8 + 1];
+	char		replybuf[1 + 8 + 8 + 8 + 8 + 8 + 1];
 	int			len = 0;
 
 	replybuf[len] = 'r';
@@ -337,6 +337,8 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested)
 	len += 8;
 	fe_sendint64(now, &replybuf[len]);	/* sendTime */
 	len += 8;
+	fe_sendint64(0, &replybuf[len]);	/* applyTimestamp */
+	len += 8;
 	replybuf[len] = replyRequested ? 1 : 0;		/* replyRequested */
 	len += 1;
 
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index c9f332c..1be2f34 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -237,6 +237,9 @@ extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
 extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI);
 extern XLogRecPtr GetXLogInsertRecPtr(void);
 extern XLogRecPtr GetXLogWriteRecPtr(void);
+extern void SetXLogReplayTimestamp(TimestampTz timestamp);
+extern void SetXLogReplayTimestampAtLsn(TimestampTz timestamp, XLogRecPtr lsn);
+extern TimestampTz GetXLogReplayTimestamp(XLogRecPtr *lsn);
 extern bool RecoveryIsPaused(void);
 extern void SetRecoveryPause(bool recoveryPause);
 extern TimestampTz GetLatestXTime(void);
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index cd7b909..b565dd8 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -2768,7 +2768,7 @@ DATA(insert OID = 2022 (  pg_stat_get_activity			PGNSP PGUID 12 1 100 0 0 f f f
 DESCR("statistics: information about currently active backends");
 DATA(insert OID = 3318 (  pg_stat_get_progress_info			  PGNSP PGUID 12 1 100 0 0 f f f f t t s r 1 0 2249 "25" "{25,23,26,26,20,20,20,20,20,20,20,20,20,20}" "{i,o,o,o,o,o,o,o,o,o,o,o,o,o}" "{cmdtype,pid,datid,relid,param1,param2,param3,param4,param5,param6,param7,param8,param9,param10}" _null_ _null_ pg_stat_get_progress_info _null_ _null_ _null_ ));
 DESCR("statistics: information about progress of backends running maintenance command");
-DATA(insert OID = 3099 (  pg_stat_get_wal_senders	PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,23,25}" "{o,o,o,o,o,o,o,o}" "{pid,state,sent_location,write_location,flush_location,replay_location,sync_priority,sync_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
+DATA(insert OID = 3099 (  pg_stat_get_wal_senders	PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,1186,23,25}" "{o,o,o,o,o,o,o,o,o}" "{pid,state,sent_location,write_location,flush_location,replay_location,replay_lag,sync_priority,sync_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
 DESCR("statistics: information about currently active replication");
 DATA(insert OID = 3317 (  pg_stat_get_wal_receiver	PGNSP PGUID 12 1 0 0 0 f f f f f f s r 0 0 2249 "" "{23,25,3220,23,3220,23,1184,1184,3220,1184,25,25}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,status,receive_start_lsn,receive_start_tli,received_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,conninfo}" _null_ _null_ pg_stat_get_wal_receiver _null_ _null_ _null_ ));
 DESCR("statistics: information about WAL receiver");
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 28dc1fc..be25758 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -23,6 +23,7 @@
 extern int	wal_receiver_status_interval;
 extern int	wal_receiver_timeout;
 extern bool hot_standby_feedback;
+extern int	replay_lag_sample_interval;
 
 /*
  * MAXCONNINFO: maximum size of a connection string.
@@ -119,6 +120,9 @@ typedef struct
 	 */
 	bool		force_reply;
 
+	/* include the latest replayed timestamp when replying? */
+	bool		force_reply_apply_timestamp;
+
 	/* set true once conninfo is ready to display (obfuscated pwds etc) */
 	bool		ready_to_display;
 
@@ -208,6 +212,6 @@ extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr,
 extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
 extern int	GetReplicationApplyDelay(void);
 extern int	GetReplicationTransferLatency(void);
-extern void WalRcvForceReply(void);
+extern void WalRcvForceReply(bool sendApplyTimestamp);
 
 #endif   /* _WALRECEIVER_H */
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 7794aa5..4de43e8 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -46,6 +46,7 @@ typedef struct WalSnd
 	XLogRecPtr	write;
 	XLogRecPtr	flush;
 	XLogRecPtr	apply;
+	int64		applyLagUs;
 
 	/* Protects shared variables shown above. */
 	slock_t		mutex;
diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h
index 93b90fe..20517c9 100644
--- a/src/include/utils/timestamp.h
+++ b/src/include/utils/timestamp.h
@@ -233,9 +233,11 @@ extern bool TimestampDifferenceExceeds(TimestampTz start_time,
 #ifndef HAVE_INT64_TIMESTAMP
 extern int64 GetCurrentIntegerTimestamp(void);
 extern TimestampTz IntegerTimestampToTimestampTz(int64 timestamp);
+extern int64 TimestampTzToIntegerTimestamp(TimestampTz timestamp);
 #else
 #define GetCurrentIntegerTimestamp()	GetCurrentTimestamp()
 #define IntegerTimestampToTimestampTz(timestamp) (timestamp)
+#define TimestampTzToIntegerTimestamp(timestamp) (timestamp)
 #endif
 
 extern TimestampTz time_t_to_timestamptz(pg_time_t tm);
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 5314b9c..d59956f 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1809,10 +1809,11 @@ pg_stat_replication| SELECT s.pid,
     w.write_location,
     w.flush_location,
     w.replay_location,
+    w.replay_lag,
     w.sync_priority,
     w.sync_state
    FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, ssl, sslversion, sslcipher, sslbits, sslcompression, sslclientdn)
-     JOIN pg_stat_get_wal_senders() w(pid, state, sent_location, write_location, flush_location, replay_location, sync_priority, sync_state) ON ((s.pid = w.pid)))
+     JOIN pg_stat_get_wal_senders() w(pid, state, sent_location, write_location, flush_location, replay_location, replay_lag, sync_priority, sync_state) ON ((s.pid = w.pid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_ssl| SELECT s.pid,
     s.ssl,
