From 62a8f97e7362f470ab38c75c36292384fd86d8df Mon Sep 17 00:00:00 2001
From: "MyungKyu, Lim" <myungkyu.lim@samsung.com>
Date: Tue, 31 Jul 2018 12:03:15 +0900
Subject: [PATCH] Implement following TODO list item

   : Add entry creation timestamp column to pg_stat_replication
       mailing list : http://archives.postgresql.org/pgsql-hackers/2011-08/msg00694.php
---
 src/backend/catalog/system_views.sql        |  3 ++-
 src/backend/replication/walsender.c         | 25 +++++++++++++++++++++----
 src/include/catalog/pg_proc.dat             |  6 +++---
 src/include/replication/walsender_private.h |  5 +++++
 src/test/regress/expected/rules.out         |  5 +++--
 5 files changed, 34 insertions(+), 10 deletions(-)

diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 7251552..eef834b 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -734,7 +734,8 @@ CREATE VIEW pg_stat_replication AS
             W.flush_lag,
             W.replay_lag,
             W.sync_priority,
-            W.sync_state
+            W.sync_state,
+            W.reply_time
     FROM pg_stat_get_activity(NULL) AS S
         JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid)
         LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index d60026d..e8a33a3 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1772,6 +1772,8 @@ ProcessStandbyReplyMessage(void)
 				applyLag;
 	bool		clearLagTimes;
 	TimestampTz now;
+	TimestampTz	reply_timestamp;
+	char	   *replytime;
 
 	static bool fullyAppliedLastTime = false;
 
@@ -1779,14 +1781,20 @@ ProcessStandbyReplyMessage(void)
 	writePtr = pq_getmsgint64(&reply_message);
 	flushPtr = pq_getmsgint64(&reply_message);
 	applyPtr = pq_getmsgint64(&reply_message);
-	(void) pq_getmsgint64(&reply_message);	/* sendTime; not used ATM */
+	reply_timestamp = pq_getmsgint64(&reply_message);	/* sendTime */
 	replyRequested = pq_getmsgbyte(&reply_message);
 
-	elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
+	/* Copy because timestamptz_to_str returns a static buffer */
+	replytime = pstrdup(timestamptz_to_str(reply_timestamp));
+
+	elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s replytime %s",
 		 (uint32) (writePtr >> 32), (uint32) writePtr,
 		 (uint32) (flushPtr >> 32), (uint32) flushPtr,
 		 (uint32) (applyPtr >> 32), (uint32) applyPtr,
-		 replyRequested ? " (reply requested)" : "");
+		 replyRequested ? " (reply requested)" : "",
+		 replytime);
+
+	pfree(replytime);
 
 	/* See if we can compute the round-trip lag for these positions. */
 	now = GetCurrentTimestamp();
@@ -1833,6 +1841,7 @@ ProcessStandbyReplyMessage(void)
 			walsnd->flushLag = flushLag;
 		if (applyLag != -1 || clearLagTimes)
 			walsnd->applyLag = applyLag;
+		walsnd->reply_timestamp = reply_timestamp;
 		SpinLockRelease(&walsnd->mutex);
 	}
 
@@ -2273,6 +2282,7 @@ InitWalSenderSlot(void)
 			walsnd->applyLag = -1;
 			walsnd->state = WALSNDSTATE_STARTUP;
 			walsnd->latch = &MyProc->procLatch;
+			walsnd->reply_timestamp = 0;
 			SpinLockRelease(&walsnd->mutex);
 			/* don't need the lock anymore */
 			MyWalSnd = (WalSnd *) walsnd;
@@ -3191,7 +3201,7 @@ offset_to_interval(TimeOffset offset)
 Datum
 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_WAL_SENDERS_COLS	11
+#define PG_STAT_GET_WAL_SENDERS_COLS	12
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
 	Tuplestorestate *tupstore;
@@ -3245,6 +3255,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 		int			priority;
 		int			pid;
 		WalSndState state;
+		TimestampTz	reply_timestamp;
 		Datum		values[PG_STAT_GET_WAL_SENDERS_COLS];
 		bool		nulls[PG_STAT_GET_WAL_SENDERS_COLS];
 
@@ -3264,6 +3275,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 		flushLag = walsnd->flushLag;
 		applyLag = walsnd->applyLag;
 		priority = walsnd->sync_standby_priority;
+		reply_timestamp = walsnd->reply_timestamp;
 		SpinLockRelease(&walsnd->mutex);
 
 		memset(nulls, 0, sizeof(nulls));
@@ -3340,6 +3352,11 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 					CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
 			else
 				values[10] = CStringGetTextDatum("potential");
+
+			if (reply_timestamp == 0)
+				nulls[11] = true;
+			else
+				values[11] = TimestampTzGetDatum(reply_timestamp);
 		}
 
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index a146510..ef262a2 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5181,9 +5181,9 @@
   proname => 'pg_stat_get_wal_senders', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => '',
-  proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state}',
+  proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time}',
   prosrc => 'pg_stat_get_wal_senders' },
 { oid => '3317', descr => 'statistics: information about WAL receiver',
   proname => 'pg_stat_get_wal_receiver', proisstrict => 'f', provolatile => 's',
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 4b90477..dc53314 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -75,6 +75,11 @@ typedef struct WalSnd
 	 * SyncRepLock.
 	 */
 	int			sync_standby_priority;
+
+	/*
+	 * The timestamp of reply message when created by the standby.
+	*/
+	TimestampTz	reply_timestamp;
 } WalSnd;
 
 extern WalSnd *MyWalSnd;
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 744d501..6fa62d0 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1861,9 +1861,10 @@ pg_stat_replication| SELECT s.pid,
     w.flush_lag,
     w.replay_lag,
     w.sync_priority,
-    w.sync_state
+    w.sync_state,
+    w.reply_time
    FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, sslclientdn)
-     JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state) ON ((s.pid = w.pid)))
+     JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_ssl| SELECT s.pid,
     s.ssl,
-- 
1.8.3.1

