From 3b05b248fa9c3b1f0d6b35bfe933ad96947d0300 Mon Sep 17 00:00:00 2001
From: Robert Haas <rhaas@postgresql.org>
Date: Fri, 29 Oct 2021 16:57:09 -0400
Subject: [PATCH 04/11] walreceiver.c: Don't rely on the global variable
 ThisTimeLineID.

Instead, pass the TLI around explicitly, as a function parameter.
Since this calls a few xlog.c functions that used ThisTimeLineID,
it was necessary to also change those functions to take a
TimeLineID as a parameter.
---
 src/backend/access/transam/xlog.c     | 52 ++++++++++++++++----------
 src/backend/replication/walreceiver.c | 54 ++++++++++++++++-----------
 src/include/access/xlog.h             |  4 +-
 3 files changed, 66 insertions(+), 44 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 6070e0f3b3..91ea1139d8 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -917,7 +917,8 @@ static void AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic);
 static bool XLogCheckpointNeeded(XLogSegNo new_segno);
 static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
 static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
-								   bool find_free, XLogSegNo max_segno);
+								   bool find_free, XLogSegNo max_segno,
+								   TimeLineID tli);
 static int	XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
 						 XLogSource source, bool notfoundOk);
 static int	XLogFileReadAnyTLI(XLogSegNo segno, int emode, XLogSource source);
@@ -2507,7 +2508,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 							wal_segment_size);
 
 			/* create/use new log file */
-			openLogFile = XLogFileInit(openLogSegNo);
+			openLogFile = XLogFileInit(openLogSegNo, ThisTimeLineID);
 			ReserveExternalFD();
 		}
 
@@ -2622,7 +2623,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 			 */
 			if (finishing_seg)
 			{
-				issue_xlog_fsync(openLogFile, openLogSegNo);
+				issue_xlog_fsync(openLogFile, openLogSegNo, ThisTimeLineID);
 
 				/* signal that we need to wakeup walsenders later */
 				WalSndWakeupRequest();
@@ -2693,7 +2694,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 				ReserveExternalFD();
 			}
 
-			issue_xlog_fsync(openLogFile, openLogSegNo);
+			issue_xlog_fsync(openLogFile, openLogSegNo, ThisTimeLineID);
 		}
 
 		/* signal that we need to wakeup walsenders later */
@@ -3285,7 +3286,8 @@ XLogNeedsFlush(XLogRecPtr record)
  * succeed.  (This is weird, but it's efficient for the callers.)
  */
 static int
-XLogFileInitInternal(XLogSegNo logsegno, bool *added, char *path)
+XLogFileInitInternal(XLogSegNo logsegno, TimeLineID logtli,
+					 bool *added, char *path)
 {
 	char		tmppath[MAXPGPATH];
 	PGAlignedXLogBlock zbuffer;
@@ -3294,7 +3296,9 @@ XLogFileInitInternal(XLogSegNo logsegno, bool *added, char *path)
 	int			fd;
 	int			save_errno;
 
-	XLogFilePath(path, ThisTimeLineID, logsegno, wal_segment_size);
+	Assert(logtli != 0);
+
+	XLogFilePath(path, logtli, logsegno, wal_segment_size);
 
 	/*
 	 * Try to use existent file (checkpoint maker may have created it already)
@@ -3438,7 +3442,8 @@ XLogFileInitInternal(XLogSegNo logsegno, bool *added, char *path)
 	 * CheckPointSegments.
 	 */
 	max_segno = logsegno + CheckPointSegments;
-	if (InstallXLogFileSegment(&installed_segno, tmppath, true, max_segno))
+	if (InstallXLogFileSegment(&installed_segno, tmppath, true, max_segno,
+							   logtli))
 	{
 		*added = true;
 		elog(DEBUG2, "done creating and filling new WAL file");
@@ -3470,13 +3475,15 @@ XLogFileInitInternal(XLogSegNo logsegno, bool *added, char *path)
  * in a critical section.
  */
 int
-XLogFileInit(XLogSegNo logsegno)
+XLogFileInit(XLogSegNo logsegno, TimeLineID logtli)
 {
 	bool		ignore_added;
 	char		path[MAXPGPATH];
 	int			fd;
 
-	fd = XLogFileInitInternal(logsegno, &ignore_added, path);
+	Assert(logtli != 0);
+
+	fd = XLogFileInitInternal(logsegno, logtli, &ignore_added, path);
 	if (fd >= 0)
 		return fd;
 
@@ -3618,7 +3625,7 @@ XLogFileCopy(XLogSegNo destsegno, TimeLineID srcTLI, XLogSegNo srcsegno,
 	/*
 	 * Now move the segment into place with its final name.
 	 */
-	if (!InstallXLogFileSegment(&destsegno, tmppath, false, 0))
+	if (!InstallXLogFileSegment(&destsegno, tmppath, false, 0, ThisTimeLineID))
 		elog(ERROR, "InstallXLogFileSegment should not have failed");
 }
 
@@ -3642,18 +3649,22 @@ XLogFileCopy(XLogSegNo destsegno, TimeLineID srcTLI, XLogSegNo srcsegno,
  * free slot is found between *segno and max_segno. (Ignored when find_free
  * is false.)
  *
+ * tli: The timeline on which the new segment should be installed.
+ *
  * Returns true if the file was installed successfully.  false indicates that
  * max_segno limit was exceeded, the startup process has disabled this
  * function for now, or an error occurred while renaming the file into place.
  */
 static bool
 InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
-					   bool find_free, XLogSegNo max_segno)
+					   bool find_free, XLogSegNo max_segno, TimeLineID tli)
 {
 	char		path[MAXPGPATH];
 	struct stat stat_buf;
 
-	XLogFilePath(path, ThisTimeLineID, *segno, wal_segment_size);
+	Assert(tli != 0);
+
+	XLogFilePath(path, tli, *segno, wal_segment_size);
 
 	LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
 	if (!XLogCtl->InstallXLogFileSegmentActive)
@@ -3679,7 +3690,7 @@ InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
 				return false;
 			}
 			(*segno)++;
-			XLogFilePath(path, ThisTimeLineID, *segno, wal_segment_size);
+			XLogFilePath(path, tli, *segno, wal_segment_size);
 		}
 	}
 
@@ -3976,7 +3987,7 @@ PreallocXlogFiles(XLogRecPtr endptr)
 	if (offset >= (uint32) (0.75 * wal_segment_size))
 	{
 		_logSegNo++;
-		lf = XLogFileInitInternal(_logSegNo, &added, path);
+		lf = XLogFileInitInternal(_logSegNo, ThisTimeLineID, &added, path);
 		if (lf >= 0)
 			close(lf);
 		if (added)
@@ -4255,7 +4266,7 @@ RemoveXlogFile(const char *segname, XLogSegNo recycleSegNo,
 		XLogCtl->InstallXLogFileSegmentActive &&	/* callee rechecks this */
 		lstat(path, &statbuf) == 0 && S_ISREG(statbuf.st_mode) &&
 		InstallXLogFileSegment(endlogSegNo, path,
-							   true, recycleSegNo))
+							   true, recycleSegNo, ThisTimeLineID))
 	{
 		ereport(DEBUG2,
 				(errmsg_internal("recycled write-ahead log file \"%s\"",
@@ -5390,7 +5401,7 @@ BootStrapXLOG(void)
 	record->xl_crc = crc;
 
 	/* Create first XLOG segment file */
-	openLogFile = XLogFileInit(1);
+	openLogFile = XLogFileInit(1, ThisTimeLineID);
 
 	/*
 	 * We needn't bother with Reserve/ReleaseExternalFD here, since we'll
@@ -5698,7 +5709,7 @@ exitArchiveRecovery(TimeLineID endTLI, XLogRecPtr endOfLog)
 		 */
 		int			fd;
 
-		fd = XLogFileInit(startLogSegNo);
+		fd = XLogFileInit(startLogSegNo, ThisTimeLineID);
 
 		if (close(fd) != 0)
 		{
@@ -10858,11 +10869,13 @@ assign_xlog_sync_method(int new_sync_method, void *extra)
  * 'segno' is for error reporting purposes.
  */
 void
-issue_xlog_fsync(int fd, XLogSegNo segno)
+issue_xlog_fsync(int fd, XLogSegNo segno, TimeLineID tli)
 {
 	char	   *msg = NULL;
 	instr_time	start;
 
+	Assert(tli != 0);
+
 	/*
 	 * Quick exit if fsync is disabled or write() has already synced the WAL
 	 * file.
@@ -10911,8 +10924,7 @@ issue_xlog_fsync(int fd, XLogSegNo segno)
 		char		xlogfname[MAXFNAMELEN];
 		int			save_errno = errno;
 
-		XLogFileName(xlogfname, ThisTimeLineID, segno,
-					 wal_segment_size);
+		XLogFileName(xlogfname, tli, segno, wal_segment_size);
 		errno = save_errno;
 		ereport(PANIC,
 				(errcode_for_file_access(),
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index b90e5ca98e..7a7eb3784e 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -122,10 +122,12 @@ static StringInfoData incoming_message;
 static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
 static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
 static void WalRcvDie(int code, Datum arg);
-static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
-static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
-static void XLogWalRcvFlush(bool dying);
-static void XLogWalRcvClose(XLogRecPtr recptr);
+static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len,
+								 TimeLineID tli);
+static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr,
+							TimeLineID tli);
+static void XLogWalRcvFlush(bool dying, TimeLineID tli);
+static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli);
 static void XLogWalRcvSendReply(bool force, bool requestReply);
 static void XLogWalRcvSendHSFeedback(bool immed);
 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
@@ -255,7 +257,7 @@ WalReceiverMain(void)
 	pg_atomic_write_u64(&WalRcv->writtenUpto, 0);
 
 	/* Arrange to clean up at walreceiver exit */
-	on_shmem_exit(WalRcvDie, 0);
+	on_shmem_exit(WalRcvDie, PointerGetDatum(&startpointTLI));
 
 	/* Properly accept or ignore signals the postmaster might send us */
 	pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
@@ -394,7 +396,6 @@ WalReceiverMain(void)
 		options.startpoint = startpoint;
 		options.slotname = slotname[0] != '\0' ? slotname : NULL;
 		options.proto.physical.startpointTLI = startpointTLI;
-		ThisTimeLineID = startpointTLI;
 		if (walrcv_startstreaming(wrconn, &options))
 		{
 			if (first_stream)
@@ -462,7 +463,8 @@ WalReceiverMain(void)
 							 */
 							last_recv_timestamp = GetCurrentTimestamp();
 							ping_sent = false;
-							XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
+							XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
+												 startpointTLI);
 						}
 						else if (len == 0)
 							break;
@@ -487,7 +489,7 @@ WalReceiverMain(void)
 					 * let the startup process and primary server know about
 					 * them.
 					 */
-					XLogWalRcvFlush(false);
+					XLogWalRcvFlush(false, startpointTLI);
 				}
 
 				/* Check if we need to exit the streaming loop. */
@@ -608,7 +610,7 @@ WalReceiverMain(void)
 		{
 			char		xlogfname[MAXFNAMELEN];
 
-			XLogWalRcvFlush(false);
+			XLogWalRcvFlush(false, startpointTLI);
 			XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
 			if (close(recvFile) != 0)
 				ereport(PANIC,
@@ -776,9 +778,12 @@ static void
 WalRcvDie(int code, Datum arg)
 {
 	WalRcvData *walrcv = WalRcv;
+	TimeLineID *startpointTLI_p = (TimeLineID *) DatumGetPointer(arg);
+
+	Assert(*startpointTLI_p != 0);
 
 	/* Ensure that all WAL records received are flushed to disk */
-	XLogWalRcvFlush(true);
+	XLogWalRcvFlush(true, *startpointTLI_p);
 
 	/* Mark ourselves inactive in shared memory */
 	SpinLockAcquire(&walrcv->mutex);
@@ -808,7 +813,7 @@ WalRcvDie(int code, Datum arg)
  * Accept the message from XLOG stream, and process it.
  */
 static void
-XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
+XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
 {
 	int			hdrlen;
 	XLogRecPtr	dataStart;
@@ -838,7 +843,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
 
 				buf += hdrlen;
 				len -= hdrlen;
-				XLogWalRcvWrite(buf, len, dataStart);
+				XLogWalRcvWrite(buf, len, dataStart, tli);
 				break;
 			}
 		case 'k':				/* Keepalive */
@@ -875,25 +880,27 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
  * Write XLOG data to disk.
  */
 static void
-XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
+XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
 {
 	int			startoff;
 	int			byteswritten;
 
+	Assert(tli != 0);
+
 	while (nbytes > 0)
 	{
 		int			segbytes;
 
 		/* Close the current segment if it's completed */
 		if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
-			XLogWalRcvClose(recptr);
+			XLogWalRcvClose(recptr, tli);
 
 		if (recvFile < 0)
 		{
 			/* Create/use new log file */
 			XLByteToSeg(recptr, recvSegNo, wal_segment_size);
-			recvFile = XLogFileInit(recvSegNo);
-			recvFileTLI = ThisTimeLineID;
+			recvFile = XLogFileInit(recvSegNo, tli);
+			recvFileTLI = tli;
 		}
 
 		/* Calculate the start offset of the received logs */
@@ -946,7 +953,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
 	 * segment is received and written.
 	 */
 	if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
-		XLogWalRcvClose(recptr);
+		XLogWalRcvClose(recptr, tli);
 }
 
 /*
@@ -956,13 +963,15 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
  * an error, so we skip sending a reply in that case.
  */
 static void
-XLogWalRcvFlush(bool dying)
+XLogWalRcvFlush(bool dying, TimeLineID tli)
 {
+	Assert(tli != 0);
+
 	if (LogstreamResult.Flush < LogstreamResult.Write)
 	{
 		WalRcvData *walrcv = WalRcv;
 
-		issue_xlog_fsync(recvFile, recvSegNo);
+		issue_xlog_fsync(recvFile, recvSegNo, tli);
 
 		LogstreamResult.Flush = LogstreamResult.Write;
 
@@ -972,7 +981,7 @@ XLogWalRcvFlush(bool dying)
 		{
 			walrcv->latestChunkStart = walrcv->flushedUpto;
 			walrcv->flushedUpto = LogstreamResult.Flush;
-			walrcv->receivedTLI = ThisTimeLineID;
+			walrcv->receivedTLI = tli;
 		}
 		SpinLockRelease(&walrcv->mutex);
 
@@ -1009,17 +1018,18 @@ XLogWalRcvFlush(bool dying)
  * Create an archive notification file since the segment is known completed.
  */
 static void
-XLogWalRcvClose(XLogRecPtr recptr)
+XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
 {
 	char		xlogfname[MAXFNAMELEN];
 
 	Assert(recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size));
+	Assert(tli != 0);
 
 	/*
 	 * fsync() and close current file before we switch to next one. We would
 	 * otherwise have to reopen this file to fsync it later
 	 */
-	XLogWalRcvFlush(false);
+	XLogWalRcvFlush(false, tli);
 
 	XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
 
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 72417f44b6..22f1d37fb2 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -262,7 +262,7 @@ extern XLogRecPtr XLogInsertRecord(struct XLogRecData *rdata,
 extern void XLogFlush(XLogRecPtr RecPtr);
 extern bool XLogBackgroundFlush(void);
 extern bool XLogNeedsFlush(XLogRecPtr RecPtr);
-extern int	XLogFileInit(XLogSegNo segno);
+extern int	XLogFileInit(XLogSegNo segno, TimeLineID tli);
 extern int	XLogFileOpen(XLogSegNo segno);
 
 extern void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli);
@@ -274,7 +274,7 @@ extern void xlog_redo(XLogReaderState *record);
 extern void xlog_desc(StringInfo buf, XLogReaderState *record);
 extern const char *xlog_identify(uint8 info);
 
-extern void issue_xlog_fsync(int fd, XLogSegNo segno);
+extern void issue_xlog_fsync(int fd, XLogSegNo segno, TimeLineID tli);
 
 extern bool RecoveryInProgress(void);
 extern RecoveryState GetRecoveryState(void);
-- 
2.24.3 (Apple Git-128)

