From b4d9f62026bf53c95f547cbc69bb747f5e69b447 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Wed, 11 Aug 2021 13:22:36 +1000
Subject: [PATCH v1] WIP - count keepalives by Kyotaro

---
 src/backend/access/transam/xlogreader.c |  5 +++
 src/backend/replication/walsender.c     | 67 +++++++++++++++++++++++++++++++++
 2 files changed, 72 insertions(+)

diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 5cf74e1..bc42f13 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -574,6 +574,7 @@ err:
  * We fetch the page from a reader-local cache if we know we have the required
  * data and if there hasn't been any error since caching the data.
  */
+int hogestate = -1;
 static int
 ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 {
@@ -608,6 +609,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 	{
 		XLogRecPtr	targetSegmentPtr = pageptr - targetPageOff;
 
+		hogestate = pageptr + XLOG_BLCKSZ - state->currRecPtr;
 		readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ,
 										   state->currRecPtr,
 										   state->readBuf);
@@ -626,6 +628,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 	 * First, read the requested data length, but at least a short page header
 	 * so that we can validate it.
 	 */
+	hogestate = pageptr + Max(reqLen, SizeOfXLogShortPHD) - state->currRecPtr;
 	readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
 									   state->currRecPtr,
 									   state->readBuf);
@@ -645,6 +648,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 	/* still not enough */
 	if (readLen < XLogPageHeaderSize(hdr))
 	{
+		hogestate = pageptr + XLogPageHeaderSize(hdr) - state->currRecPtr;
 		readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr),
 										   state->currRecPtr,
 										   state->readBuf);
@@ -652,6 +656,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 			goto err;
 	}
 
+	hogestate = -1;
 	/*
 	 * Now that we know we have the full header, validate it.
 	 */
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 3ca2a11..d8051c3 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1371,17 +1371,50 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
  * if we detect a shutdown request (either from postmaster or client)
  * we will return early, so caller must always check.
  */
+unsigned long counts[32768][3] = {{0}};
+unsigned long lagw[32768] = {0};
+unsigned long lagf[32768] = {0};
+unsigned long nrec = 0;
+void PrintCounts(void);
+void
+PrintCounts(void)
+{
+	int i = 0;
+	ereport(LOG, (errmsg ("Total records: %lu", nrec), errhidestmt(true)));
+	nrec = 0;
+
+	for (i = 0 ; i < 32768 ; i++)
+	{
+		if (counts[i][0] + counts[i][1] + counts[i][2] > 0)
+		{
+			unsigned long wl = 0, fl = 0;
+			if (counts[i][1] > 0)
+			{
+				wl = lagw[i] / counts[i][0];
+				fl = lagf[i] / counts[i][0];
+			
+				ereport(LOG, (errmsg ("%5d: %5lu / %5lu / %5lu: %7lu / %7lu",
+									  i, counts[i][1], counts[i][2], counts[i][0], wl, fl), errhidestmt(true)));
+			}
+			counts[i][0] = counts[i][1] = counts[i][2] = lagw[i] = lagf[i] = 0;
+		}
+	}
+}
+
 static XLogRecPtr
 WalSndWaitForWal(XLogRecPtr loc)
 {
 	int			wakeEvents;
 	static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
+	extern int hogestate;
+	bool		lagtaken = false;
 
 	/*
 	 * Fast path to avoid acquiring the spinlock in case we already know we
 	 * have enough WAL available. This is particularly interesting if we're
 	 * far behind.
 	 */
+	counts[hogestate][0]++;
 	if (RecentFlushPtr != InvalidXLogRecPtr &&
 		loc <= RecentFlushPtr)
 		return RecentFlushPtr;
@@ -1447,7 +1480,39 @@ WalSndWaitForWal(XLogRecPtr loc)
 		if (MyWalSnd->flush < sentPtr &&
 			MyWalSnd->write < sentPtr &&
 			!waiting_for_ping_response)
+		{
+			if (hogestate >= 0)
+			{
+				counts[hogestate][1]++;
+				if (!lagtaken)
+				{
+					lagf[hogestate] += sentPtr - MyWalSnd->flush;
+					lagw[hogestate] += sentPtr - MyWalSnd->write;
+					lagtaken = true;
+				}
+			}
+//			ereport(LOG, (errmsg ("KA[%lu/%lu/%lu]: %X/%X %X/%X %X/%X %d: %ld",
+//											  ka, na, ka + na,
+//											  LSN_FORMAT_ARGS(MyWalSnd->flush),
+//											  LSN_FORMAT_ARGS(MyWalSnd->write),
+//											  LSN_FORMAT_ARGS(sentPtr),
+//											  waiting_for_ping_response,
+//											  sentPtr - MyWalSnd->write)));
 			WalSndKeepalive(false);
+		}
+		else
+		{
+			if (hogestate >= 0)
+				counts[hogestate][2]++;
+
+//			ereport(LOG, (errmsg ("kap[%lu/%lu/%lu]: %X/%X %X/%X %X/%X %d: %ld",
+//											  ka, na, ka + na,
+//											  LSN_FORMAT_ARGS(MyWalSnd->flush),
+//											  LSN_FORMAT_ARGS(MyWalSnd->write),
+//											  LSN_FORMAT_ARGS(sentPtr),
+//											  waiting_for_ping_response,
+//											  sentPtr - MyWalSnd->write)));
+		}
 
 		/* check whether we're done */
 		if (loc <= RecentFlushPtr)
@@ -2851,6 +2916,7 @@ XLogSendLogical(void)
 {
 	XLogRecord *record;
 	char	   *errm;
+	extern unsigned long nrec;
 
 	/*
 	 * We'll use the current flush point to determine whether we've caught up.
@@ -2868,6 +2934,7 @@ XLogSendLogical(void)
 	 */
 	WalSndCaughtUp = false;
 
+	nrec++;
 	record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
 
 	/* xlog record was invalid */
-- 
1.8.3.1

