On Mon, Oct 10, 2022 at 03:22:15PM +0530, Bharath Rupireddy wrote:
> Some comments on v4-0002:

Thanks for taking a look.

> 1. You might want to use PG_INT64_MAX instead of INT64_MAX for portability?

Yes, I used PG_INT64_MAX in v5.

> 2. With the below change, the time walreceiver spends in
> XLogWalRcvSendReply() is also included for XLogWalRcvSendHSFeedback
> right? I think it's a problem given that XLogWalRcvSendReply() can
> take a while. Earlier, this wasn't the case, each function calculating
> 'now' separately. Any reason for changing this behaviour? I know that
> GetCurrentTimestamp(); isn't cheaper all the time, but here it might
> result in a change in the behaviour.

Yes, if XLogWalRcvSendReply() takes a long time, we might defer sending the
hot_standby_feedback message until a later time.  The only reason I changed
this was to avoid extra calls to GetCurrentTimestamp(), which might be
expensive on some platforms.  Outside of the code snippet you pointed out,
I think WalReceiverMain() has a similar problem.  That being said, I'm
generally skeptical that this sort of thing is detrimental given the
current behavior (i.e., wake up every 100ms), the usual values of these
GUCs (i.e., tens of seconds), and the fact that any tasks that are
inappropriately skipped will typically be retried in the next iteration of
the loop.

> 3. I understand that TimestampTz type is treated as microseconds.
> Would you mind having a comment in below places to say why we're
> multiplying with 1000 or 1000000 here? Also, do we need 1000L or
> 1000000L or type cast to
> TimestampTz?

I used INT64CONST() in v5, as that seemed the most portable, but I stopped
short of adding comments to explain all the conversions.  IMO the
conversions are relatively obvious, and the units of the GUCs can be easily
seen in guc_tables.c.

> 4. How about simplifying WalRcvComputeNextWakeup() something like below?

Other than saving a couple lines of code, IMO this doesn't meaningfully
simplify the function or improve readability.

> 5. Can we move below code snippets to respective static functions for
> better readability and code reuse?
> This:
> +                /* Find the soonest wakeup time, to limit our nap. */
> +                nextWakeup = INT64_MAX;
> +                for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
> +                    nextWakeup = Min(state.wakeup[i], nextWakeup);
> +                nap = Max(0, (nextWakeup - now + 999) / 1000);
> 
> And this:
> 
> +                    now = GetCurrentTimestamp();
> +                    for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
> +                        WalRcvComputeNextWakeup(&state, i, now);

This did cross my mind, but I opted to avoid creating new functions because
1) they aren't likely to be reused very much, and 2) I actually think it
might hurt readability by forcing developers to traipse around the file to
figure out what these functions are actually doing.  It's not like it would
save many lines of code, either.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From 9d274eecef8e66e60b34d14c24459e70e782cf86 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandboss...@gmail.com>
Date: Wed, 5 Oct 2022 10:31:35 -0700
Subject: [PATCH v5 1/2] Move WAL receivers' non-shared state to a new struct.

This is preparatory work for a follow-up change that will revamp
the wakeup mechanism for periodic tasks that WAL receivers must
perform.
---
 src/backend/replication/walreceiver.c | 90 ++++++++++++++-------------
 1 file changed, 48 insertions(+), 42 deletions(-)

diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 6cbb67c92a..89985c54cf 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -116,6 +116,14 @@ static struct
 	XLogRecPtr	Flush;			/* last byte + 1 flushed in the standby */
 }			LogstreamResult;
 
+/*
+ * A struct to keep track of non-shared state.
+ */
+typedef struct WalRcvInfo
+{
+	TimeLineID	startpointTLI;
+} WalRcvInfo;
+
 static StringInfoData reply_message;
 static StringInfoData incoming_message;
 
@@ -123,12 +131,12 @@ static StringInfoData incoming_message;
 static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
 static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
 static void WalRcvDie(int code, Datum arg);
-static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len,
-								 TimeLineID tli);
-static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr,
-							TimeLineID tli);
-static void XLogWalRcvFlush(bool dying, TimeLineID tli);
-static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli);
+static void XLogWalRcvProcessMsg(WalRcvInfo *state, unsigned char type,
+								 char *buf, Size len);
+static void XLogWalRcvWrite(WalRcvInfo *state, char *buf, Size nbytes,
+							XLogRecPtr recptr);
+static void XLogWalRcvFlush(WalRcvInfo *state, bool dying);
+static void XLogWalRcvClose(WalRcvInfo *state, XLogRecPtr recptr);
 static void XLogWalRcvSendReply(bool force, bool requestReply);
 static void XLogWalRcvSendHSFeedback(bool immed);
 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
@@ -175,7 +183,6 @@ WalReceiverMain(void)
 	char		slotname[NAMEDATALEN];
 	bool		is_temp_slot;
 	XLogRecPtr	startpoint;
-	TimeLineID	startpointTLI;
 	TimeLineID	primaryTLI;
 	bool		first_stream;
 	WalRcvData *walrcv = WalRcv;
@@ -185,6 +192,7 @@ WalReceiverMain(void)
 	char	   *err;
 	char	   *sender_host = NULL;
 	int			sender_port = 0;
+	WalRcvInfo	state = {0};
 
 	/*
 	 * WalRcv should be set up already (if we are a backend, we inherit this
@@ -238,7 +246,7 @@ WalReceiverMain(void)
 	strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
 	is_temp_slot = walrcv->is_temp_slot;
 	startpoint = walrcv->receiveStart;
-	startpointTLI = walrcv->receiveStartTLI;
+	state.startpointTLI = walrcv->receiveStartTLI;
 
 	/*
 	 * At most one of is_temp_slot and slotname can be set; otherwise,
@@ -258,7 +266,7 @@ WalReceiverMain(void)
 	pg_atomic_write_u64(&WalRcv->writtenUpto, 0);
 
 	/* Arrange to clean up at walreceiver exit */
-	on_shmem_exit(WalRcvDie, PointerGetDatum(&startpointTLI));
+	on_shmem_exit(WalRcvDie, PointerGetDatum(&state));
 
 	/* Properly accept or ignore signals the postmaster might send us */
 	pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
@@ -345,11 +353,11 @@ WalReceiverMain(void)
 		 * Confirm that the current timeline of the primary is the same or
 		 * ahead of ours.
 		 */
-		if (primaryTLI < startpointTLI)
+		if (primaryTLI < state.startpointTLI)
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 					 errmsg("highest timeline %u of the primary is behind recovery timeline %u",
-							primaryTLI, startpointTLI)));
+							primaryTLI, state.startpointTLI)));
 
 		/*
 		 * Get any missing history files. We do this always, even when we're
@@ -361,7 +369,7 @@ WalReceiverMain(void)
 		 * but let's avoid the confusion of timeline id collisions where we
 		 * can.
 		 */
-		WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
+		WalRcvFetchTimeLineHistoryFiles(state.startpointTLI, primaryTLI);
 
 		/*
 		 * Create temporary replication slot if requested, and update slot
@@ -396,17 +404,17 @@ WalReceiverMain(void)
 		options.logical = false;
 		options.startpoint = startpoint;
 		options.slotname = slotname[0] != '\0' ? slotname : NULL;
-		options.proto.physical.startpointTLI = startpointTLI;
+		options.proto.physical.startpointTLI = state.startpointTLI;
 		if (walrcv_startstreaming(wrconn, &options))
 		{
 			if (first_stream)
 				ereport(LOG,
 						(errmsg("started streaming WAL from primary at %X/%X on timeline %u",
-								LSN_FORMAT_ARGS(startpoint), startpointTLI)));
+								LSN_FORMAT_ARGS(startpoint), state.startpointTLI)));
 			else
 				ereport(LOG,
 						(errmsg("restarted WAL streaming at %X/%X on timeline %u",
-								LSN_FORMAT_ARGS(startpoint), startpointTLI)));
+								LSN_FORMAT_ARGS(startpoint), state.startpointTLI)));
 			first_stream = false;
 
 			/* Initialize LogstreamResult and buffers for processing messages */
@@ -464,8 +472,8 @@ WalReceiverMain(void)
 							 */
 							last_recv_timestamp = GetCurrentTimestamp();
 							ping_sent = false;
-							XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
-												 startpointTLI);
+							XLogWalRcvProcessMsg(&state, buf[0], &buf[1],
+												 len - 1);
 						}
 						else if (len == 0)
 							break;
@@ -474,7 +482,7 @@ WalReceiverMain(void)
 							ereport(LOG,
 									(errmsg("replication terminated by primary server"),
 									 errdetail("End of WAL reached on timeline %u at %X/%X.",
-											   startpointTLI,
+											   state.startpointTLI,
 											   LSN_FORMAT_ARGS(LogstreamResult.Write))));
 							endofwal = true;
 							break;
@@ -490,7 +498,7 @@ WalReceiverMain(void)
 					 * let the startup process and primary server know about
 					 * them.
 					 */
-					XLogWalRcvFlush(false, startpointTLI);
+					XLogWalRcvFlush(&state, false);
 				}
 
 				/* Check if we need to exit the streaming loop. */
@@ -596,12 +604,12 @@ WalReceiverMain(void)
 			 * know about when we began streaming, fetch its timeline history
 			 * file now.
 			 */
-			WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
+			WalRcvFetchTimeLineHistoryFiles(state.startpointTLI, primaryTLI);
 		}
 		else
 			ereport(LOG,
 					(errmsg("primary server contains no more WAL on requested timeline %u",
-							startpointTLI)));
+							state.startpointTLI)));
 
 		/*
 		 * End of WAL reached on the requested timeline. Close the last
@@ -611,7 +619,7 @@ WalReceiverMain(void)
 		{
 			char		xlogfname[MAXFNAMELEN];
 
-			XLogWalRcvFlush(false, startpointTLI);
+			XLogWalRcvFlush(&state, false);
 			XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
 			if (close(recvFile) != 0)
 				ereport(PANIC,
@@ -631,7 +639,7 @@ WalReceiverMain(void)
 		recvFile = -1;
 
 		elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
-		WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
+		WalRcvWaitForStartPosition(&startpoint, &state.startpointTLI);
 	}
 	/* not reached */
 }
@@ -779,12 +787,10 @@ static void
 WalRcvDie(int code, Datum arg)
 {
 	WalRcvData *walrcv = WalRcv;
-	TimeLineID *startpointTLI_p = (TimeLineID *) DatumGetPointer(arg);
-
-	Assert(*startpointTLI_p != 0);
+	WalRcvInfo *state = (WalRcvInfo *) DatumGetPointer(arg);
 
 	/* Ensure that all WAL records received are flushed to disk */
-	XLogWalRcvFlush(true, *startpointTLI_p);
+	XLogWalRcvFlush(state, true);
 
 	/* Mark ourselves inactive in shared memory */
 	SpinLockAcquire(&walrcv->mutex);
@@ -814,7 +820,7 @@ WalRcvDie(int code, Datum arg)
  * Accept the message from XLOG stream, and process it.
  */
 static void
-XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
+XLogWalRcvProcessMsg(WalRcvInfo *state, unsigned char type, char *buf, Size len)
 {
 	int			hdrlen;
 	XLogRecPtr	dataStart;
@@ -844,7 +850,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
 
 				buf += hdrlen;
 				len -= hdrlen;
-				XLogWalRcvWrite(buf, len, dataStart, tli);
+				XLogWalRcvWrite(state, buf, len, dataStart);
 				break;
 			}
 		case 'k':				/* Keepalive */
@@ -881,12 +887,12 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
  * Write XLOG data to disk.
  */
 static void
-XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
+XLogWalRcvWrite(WalRcvInfo *state, char *buf, Size nbytes, XLogRecPtr recptr)
 {
 	int			startoff;
 	int			byteswritten;
 
-	Assert(tli != 0);
+	Assert(state->startpointTLI != 0);
 
 	while (nbytes > 0)
 	{
@@ -894,14 +900,14 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
 
 		/* Close the current segment if it's completed */
 		if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
-			XLogWalRcvClose(recptr, tli);
+			XLogWalRcvClose(state, recptr);
 
 		if (recvFile < 0)
 		{
 			/* Create/use new log file */
 			XLByteToSeg(recptr, recvSegNo, wal_segment_size);
-			recvFile = XLogFileInit(recvSegNo, tli);
-			recvFileTLI = tli;
+			recvFile = XLogFileInit(recvSegNo, state->startpointTLI);
+			recvFileTLI = state->startpointTLI;
 		}
 
 		/* Calculate the start offset of the received logs */
@@ -954,7 +960,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
 	 * segment is received and written.
 	 */
 	if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
-		XLogWalRcvClose(recptr, tli);
+		XLogWalRcvClose(state, recptr);
 }
 
 /*
@@ -964,15 +970,15 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
  * an error, so we skip sending a reply in that case.
  */
 static void
-XLogWalRcvFlush(bool dying, TimeLineID tli)
+XLogWalRcvFlush(WalRcvInfo *state, bool dying)
 {
-	Assert(tli != 0);
+	Assert(state->startpointTLI != 0);
 
 	if (LogstreamResult.Flush < LogstreamResult.Write)
 	{
 		WalRcvData *walrcv = WalRcv;
 
-		issue_xlog_fsync(recvFile, recvSegNo, tli);
+		issue_xlog_fsync(recvFile, recvSegNo, state->startpointTLI);
 
 		LogstreamResult.Flush = LogstreamResult.Write;
 
@@ -982,7 +988,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
 		{
 			walrcv->latestChunkStart = walrcv->flushedUpto;
 			walrcv->flushedUpto = LogstreamResult.Flush;
-			walrcv->receivedTLI = tli;
+			walrcv->receivedTLI = state->startpointTLI;
 		}
 		SpinLockRelease(&walrcv->mutex);
 
@@ -1019,18 +1025,18 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
  * Create an archive notification file since the segment is known completed.
  */
 static void
-XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
+XLogWalRcvClose(WalRcvInfo *state, XLogRecPtr recptr)
 {
 	char		xlogfname[MAXFNAMELEN];
 
 	Assert(recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size));
-	Assert(tli != 0);
+	Assert(state->startpointTLI != 0);
 
 	/*
 	 * fsync() and close current file before we switch to next one. We would
 	 * otherwise have to reopen this file to fsync it later
 	 */
-	XLogWalRcvFlush(false, tli);
+	XLogWalRcvFlush(state, false);
 
 	XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
 
-- 
2.25.1

>From 3c2f635b48372717453fa4f5cfe04b5097ea6184 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Thu, 27 Jan 2022 21:43:17 +1300
Subject: [PATCH v5 2/2] Suppress useless wakeups in walreceiver.

Instead of waking up 10 times per second to check for various timeout
conditions, keep track of when we next have periodic work to do.

Reviewed-by: Kyotaro Horiguchi, Bharath Rupireddy
Discussion: https://postgr.es/m/CA%2BhUKGJGhX4r2LPUE3Oy9BX71Eum6PBcS8L3sJpScR9oKaTVaA%40mail.gmail.com
---
 src/backend/replication/walreceiver.c | 203 ++++++++++++++++----------
 1 file changed, 125 insertions(+), 78 deletions(-)

diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 89985c54cf..51a1916770 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -95,8 +95,6 @@ bool		hot_standby_feedback;
 static WalReceiverConn *wrconn = NULL;
 WalReceiverFunctionsType *WalReceiverFunctions = NULL;
 
-#define NAPTIME_PER_CYCLE 100	/* max sleep time between cycles (100ms) */
-
 /*
  * These variables are used similarly to openLogFile/SegNo,
  * but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID
@@ -116,12 +114,25 @@ static struct
 	XLogRecPtr	Flush;			/* last byte + 1 flushed in the standby */
 }			LogstreamResult;
 
+/*
+ * Reasons to wake up and perform periodic tasks.
+ */
+typedef enum WalRcvWakeupReason
+{
+	WALRCV_WAKEUP_TERMINATE,
+	WALRCV_WAKEUP_PING,
+	WALRCV_WAKEUP_REPLY,
+	WALRCV_WAKEUP_HSFEEDBACK,
+	NUM_WALRCV_WAKEUPS
+} WalRcvWakeupReason;
+
 /*
  * A struct to keep track of non-shared state.
  */
 typedef struct WalRcvInfo
 {
 	TimeLineID	startpointTLI;
+	TimestampTz	wakeup[NUM_WALRCV_WAKEUPS];
 } WalRcvInfo;
 
 static StringInfoData reply_message;
@@ -137,9 +148,13 @@ static void XLogWalRcvWrite(WalRcvInfo *state, char *buf, Size nbytes,
 							XLogRecPtr recptr);
 static void XLogWalRcvFlush(WalRcvInfo *state, bool dying);
 static void XLogWalRcvClose(WalRcvInfo *state, XLogRecPtr recptr);
-static void XLogWalRcvSendReply(bool force, bool requestReply);
-static void XLogWalRcvSendHSFeedback(bool immed);
+static void XLogWalRcvSendReply(WalRcvInfo *state, TimestampTz now, bool force,
+								bool requestReply);
+static void XLogWalRcvSendHSFeedback(WalRcvInfo *state, TimestampTz now,
+									 bool immed);
 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
+static void WalRcvComputeNextWakeup(WalRcvInfo *state,
+									WalRcvWakeupReason reason, TimestampTz now);
 
 /*
  * Process any interrupts the walreceiver process may have received.
@@ -186,9 +201,7 @@ WalReceiverMain(void)
 	TimeLineID	primaryTLI;
 	bool		first_stream;
 	WalRcvData *walrcv = WalRcv;
-	TimestampTz last_recv_timestamp;
-	TimestampTz starttime;
-	bool		ping_sent;
+	TimestampTz now;
 	char	   *err;
 	char	   *sender_host = NULL;
 	int			sender_port = 0;
@@ -200,7 +213,7 @@ WalReceiverMain(void)
 	 */
 	Assert(walrcv != NULL);
 
-	starttime = GetCurrentTimestamp();
+	now = GetCurrentTimestamp();
 
 	/*
 	 * Mark walreceiver as running in shared memory.
@@ -256,7 +269,7 @@ WalReceiverMain(void)
 
 	/* Initialise to a sanish value */
 	walrcv->lastMsgSendTime =
-		walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = starttime;
+		walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
 
 	/* Report the latch to use to awaken this process */
 	walrcv->latch = &MyProc->procLatch;
@@ -422,9 +435,10 @@ WalReceiverMain(void)
 			initStringInfo(&reply_message);
 			initStringInfo(&incoming_message);
 
-			/* Initialize the last recv timestamp */
-			last_recv_timestamp = GetCurrentTimestamp();
-			ping_sent = false;
+			/* Initialize nap wakeup times. */
+			now = GetCurrentTimestamp();
+			for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
+				WalRcvComputeNextWakeup(&state, i, now);
 
 			/* Loop until end-of-streaming or error */
 			for (;;)
@@ -434,6 +448,8 @@ WalReceiverMain(void)
 				bool		endofwal = false;
 				pgsocket	wait_fd = PGINVALID_SOCKET;
 				int			rc;
+				TimestampTz	nextWakeup;
+				int			nap;
 
 				/*
 				 * Exit walreceiver if we're not in recovery. This should not
@@ -451,11 +467,15 @@ WalReceiverMain(void)
 				{
 					ConfigReloadPending = false;
 					ProcessConfigFile(PGC_SIGHUP);
-					XLogWalRcvSendHSFeedback(true);
+					now = GetCurrentTimestamp();
+					for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
+						WalRcvComputeNextWakeup(&state, i, now);
+					XLogWalRcvSendHSFeedback(&state, now, true);
 				}
 
 				/* See if we can read data immediately */
 				len = walrcv_receive(wrconn, &buf, &wait_fd);
+				now = GetCurrentTimestamp();
 				if (len != 0)
 				{
 					/*
@@ -467,11 +487,15 @@ WalReceiverMain(void)
 						if (len > 0)
 						{
 							/*
-							 * Something was received from primary, so reset
-							 * timeout
+							 * Something was received from primary, so adjust
+							 * the ping and terminate wakeup times.
 							 */
-							last_recv_timestamp = GetCurrentTimestamp();
-							ping_sent = false;
+							WalRcvComputeNextWakeup(&state,
+													WALRCV_WAKEUP_TERMINATE,
+													now);
+							WalRcvComputeNextWakeup(&state,
+													WALRCV_WAKEUP_PING,
+													now);
 							XLogWalRcvProcessMsg(&state, buf[0], &buf[1],
 												 len - 1);
 						}
@@ -488,10 +512,11 @@ WalReceiverMain(void)
 							break;
 						}
 						len = walrcv_receive(wrconn, &buf, &wait_fd);
+						now = GetCurrentTimestamp();
 					}
 
 					/* Let the primary know that we received some data. */
-					XLogWalRcvSendReply(false, false);
+					XLogWalRcvSendReply(&state, now, false, false);
 
 					/*
 					 * If we've written some records, flush them to disk and
@@ -505,6 +530,12 @@ WalReceiverMain(void)
 				if (endofwal)
 					break;
 
+				/* Find the soonest wakeup time, to limit our nap. */
+				nextWakeup = PG_INT64_MAX;
+				for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
+					nextWakeup = Min(state.wakeup[i], nextWakeup);
+				nap = Max(0, (nextWakeup - now + 999) / 1000);
+
 				/*
 				 * Ideally we would reuse a WaitEventSet object repeatedly
 				 * here to avoid the overheads of WaitLatchOrSocket on epoll
@@ -521,8 +552,9 @@ WalReceiverMain(void)
 									   WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
 									   WL_TIMEOUT | WL_LATCH_SET,
 									   wait_fd,
-									   NAPTIME_PER_CYCLE,
+									   nap,
 									   WAIT_EVENT_WAL_RECEIVER_MAIN);
+				now = GetCurrentTimestamp();
 				if (rc & WL_LATCH_SET)
 				{
 					ResetLatch(MyLatch);
@@ -538,7 +570,7 @@ WalReceiverMain(void)
 						 */
 						walrcv->force_reply = false;
 						pg_memory_barrier();
-						XLogWalRcvSendReply(true, false);
+						XLogWalRcvSendReply(&state, now, true, false);
 					}
 				}
 				if (rc & WL_TIMEOUT)
@@ -558,38 +590,23 @@ WalReceiverMain(void)
 					 * Check if time since last receive from primary has
 					 * reached the configured limit.
 					 */
-					if (wal_receiver_timeout > 0)
-					{
-						TimestampTz now = GetCurrentTimestamp();
-						TimestampTz timeout;
-
-						timeout =
-							TimestampTzPlusMilliseconds(last_recv_timestamp,
-														wal_receiver_timeout);
-
-						if (now >= timeout)
-							ereport(ERROR,
-									(errcode(ERRCODE_CONNECTION_FAILURE),
-									 errmsg("terminating walreceiver due to timeout")));
+					if (now >= state.wakeup[WALRCV_WAKEUP_TERMINATE])
+						ereport(ERROR,
+								(errcode(ERRCODE_CONNECTION_FAILURE),
+								 errmsg("terminating walreceiver due to timeout")));
 
-						/*
-						 * We didn't receive anything new, for half of
-						 * receiver replication timeout. Ping the server.
-						 */
-						if (!ping_sent)
-						{
-							timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
-																  (wal_receiver_timeout / 2));
-							if (now >= timeout)
-							{
-								requestReply = true;
-								ping_sent = true;
-							}
-						}
+					/*
+					 * We didn't receive anything new, for half of receiver
+					 * replication timeout. Ping the server.
+					 */
+					if (now >= state.wakeup[WALRCV_WAKEUP_PING])
+					{
+						requestReply = true;
+						state.wakeup[WALRCV_WAKEUP_PING] = PG_INT64_MAX;
 					}
 
-					XLogWalRcvSendReply(requestReply, requestReply);
-					XLogWalRcvSendHSFeedback(false);
+					XLogWalRcvSendReply(&state, now, requestReply, requestReply);
+					XLogWalRcvSendHSFeedback(&state, now, false);
 				}
 			}
 
@@ -872,7 +889,7 @@ XLogWalRcvProcessMsg(WalRcvInfo *state, unsigned char type, char *buf, Size len)
 
 				/* If the primary requested a reply, send one immediately */
 				if (replyRequested)
-					XLogWalRcvSendReply(true, false);
+					XLogWalRcvSendReply(state, GetCurrentTimestamp(), true, false);
 				break;
 			}
 		default:
@@ -1010,8 +1027,10 @@ XLogWalRcvFlush(WalRcvInfo *state, bool dying)
 		/* Also let the primary know that we made some progress */
 		if (!dying)
 		{
-			XLogWalRcvSendReply(false, false);
-			XLogWalRcvSendHSFeedback(false);
+			TimestampTz now = GetCurrentTimestamp();
+
+			XLogWalRcvSendReply(state, now, false, false);
+			XLogWalRcvSendHSFeedback(state, now, false);
 		}
 	}
 }
@@ -1077,13 +1096,12 @@ XLogWalRcvClose(WalRcvInfo *state, XLogRecPtr recptr)
  * wal_receiver_timeout.
  */
 static void
-XLogWalRcvSendReply(bool force, bool requestReply)
+XLogWalRcvSendReply(WalRcvInfo *state, TimestampTz now, bool force,
+					bool requestReply)
 {
 	static XLogRecPtr writePtr = 0;
 	static XLogRecPtr flushPtr = 0;
 	XLogRecPtr	applyPtr;
-	static TimestampTz sendTime = 0;
-	TimestampTz now;
 
 	/*
 	 * If the user doesn't want status to be reported to the primary, be sure
@@ -1092,9 +1110,6 @@ XLogWalRcvSendReply(bool force, bool requestReply)
 	if (!force && wal_receiver_status_interval <= 0)
 		return;
 
-	/* Get current timestamp. */
-	now = GetCurrentTimestamp();
-
 	/*
 	 * We can compare the write and flush positions to the last message we
 	 * sent without taking any lock, but the apply position requires a spin
@@ -1107,10 +1122,11 @@ XLogWalRcvSendReply(bool force, bool requestReply)
 	if (!force
 		&& writePtr == LogstreamResult.Write
 		&& flushPtr == LogstreamResult.Flush
-		&& !TimestampDifferenceExceeds(sendTime, now,
-									   wal_receiver_status_interval * 1000))
+		&& now < state->wakeup[WALRCV_WAKEUP_REPLY])
 		return;
-	sendTime = now;
+
+	/* Make sure we wake up when it's time to send another reply. */
+	WalRcvComputeNextWakeup(state, WALRCV_WAKEUP_REPLY, now);
 
 	/* Construct a new message */
 	writePtr = LogstreamResult.Write;
@@ -1122,7 +1138,7 @@ XLogWalRcvSendReply(bool force, bool requestReply)
 	pq_sendint64(&reply_message, writePtr);
 	pq_sendint64(&reply_message, flushPtr);
 	pq_sendint64(&reply_message, applyPtr);
-	pq_sendint64(&reply_message, GetCurrentTimestamp());
+	pq_sendint64(&reply_message, now);
 	pq_sendbyte(&reply_message, requestReply ? 1 : 0);
 
 	/* Send it */
@@ -1146,16 +1162,14 @@ XLogWalRcvSendReply(bool force, bool requestReply)
  * send a feedback message explicitly setting InvalidTransactionId).
  */
 static void
-XLogWalRcvSendHSFeedback(bool immed)
+XLogWalRcvSendHSFeedback(WalRcvInfo *state, TimestampTz now, bool immed)
 {
-	TimestampTz now;
 	FullTransactionId nextFullXid;
 	TransactionId nextXid;
 	uint32		xmin_epoch,
 				catalog_xmin_epoch;
 	TransactionId xmin,
 				catalog_xmin;
-	static TimestampTz sendTime = 0;
 
 	/* initially true so we always send at least one feedback message */
 	static bool primary_has_standby_xmin = true;
@@ -1168,19 +1182,12 @@ XLogWalRcvSendHSFeedback(bool immed)
 		!primary_has_standby_xmin)
 		return;
 
-	/* Get current timestamp. */
-	now = GetCurrentTimestamp();
+	/* Send feedback at most once per wal_receiver_status_interval. */
+	if (!immed && now < state->wakeup[WALRCV_WAKEUP_HSFEEDBACK])
+		return;
 
-	if (!immed)
-	{
-		/*
-		 * Send feedback at most once per wal_receiver_status_interval.
-		 */
-		if (!TimestampDifferenceExceeds(sendTime, now,
-										wal_receiver_status_interval * 1000))
-			return;
-		sendTime = now;
-	}
+	/* Make sure we wake up when it's time to send feedback again. */
+	WalRcvComputeNextWakeup(state, WALRCV_WAKEUP_HSFEEDBACK, now);
 
 	/*
 	 * If Hot Standby is not yet accepting connections there is nothing to
@@ -1228,7 +1235,7 @@ XLogWalRcvSendHSFeedback(bool immed)
 	/* Construct the message and send it. */
 	resetStringInfo(&reply_message);
 	pq_sendbyte(&reply_message, 'h');
-	pq_sendint64(&reply_message, GetCurrentTimestamp());
+	pq_sendint64(&reply_message, now);
 	pq_sendint32(&reply_message, xmin);
 	pq_sendint32(&reply_message, xmin_epoch);
 	pq_sendint32(&reply_message, catalog_xmin);
@@ -1291,6 +1298,46 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
 	}
 }
 
+/*
+ * Compute the next wakeup time for a given wakeup reason.  Can be called to
+ * initialize a wakeup time, to adjust it for the next wakeup, or to
+ * reinitialize it when GUCs have changed.
+ */
+static void
+WalRcvComputeNextWakeup(WalRcvInfo *state, WalRcvWakeupReason reason,
+						TimestampTz now)
+{
+	switch (reason)
+	{
+	case WALRCV_WAKEUP_TERMINATE:
+		if (wal_receiver_timeout <= 0)
+			state->wakeup[reason] = PG_INT64_MAX;
+		else
+			state->wakeup[reason] = now + wal_receiver_timeout * INT64CONST(1000);
+		break;
+	case WALRCV_WAKEUP_PING:
+		if (wal_receiver_timeout <= 0)
+			state->wakeup[reason] = PG_INT64_MAX;
+		else
+			state->wakeup[reason] = now + (wal_receiver_timeout / 2) * INT64CONST(1000);
+		break;
+	case WALRCV_WAKEUP_HSFEEDBACK:
+		if (!hot_standby_feedback || wal_receiver_status_interval <= 0)
+			state->wakeup[reason] = PG_INT64_MAX;
+		else
+			state->wakeup[reason] = now + wal_receiver_status_interval * INT64CONST(1000000);
+		break;
+	case WALRCV_WAKEUP_REPLY:
+		if (wal_receiver_status_interval <= 0)
+			state->wakeup[reason] = PG_INT64_MAX;
+		else
+			state->wakeup[reason] = now + wal_receiver_status_interval * INT64CONST(1000000);
+		break;
+	default:
+		break;
+	}
+}
+
 /*
  * Wake up the walreceiver main loop.
  *
-- 
2.25.1

Reply via email to