Alvaro Herrera <alvhe...@2ndquadrant.com> wrote:

> Hi Antonin, could you please rebase again?

Attached.

-- 
Antonin Houska
Web: https://www.cybertec-postgresql.com

>From 7b15d7bbdae13c743ddeae10b8ff79e9b02d8243 Mon Sep 17 00:00:00 2001
From: Antonin Houska <a...@cybertec.at>
Date: Mon, 9 Sep 2019 11:53:54 +0200
Subject: [PATCH 1/4] Remove TLI from some argument lists.

The timeline information is available to caller via XLogReaderState. Now that
XLogRead() is gonna be (sometimes) responsible for determining the TLI, it
would have to be added the (TimeLineID *) argument too, just to be consistent
with the current coding style. Since XLogRead() updates also other
position-specific fields of XLogReaderState, it seems simpler if we remove the
output argument from XLogPageReadCB and always report the TLI via
XLogReaderState.
---
 src/backend/access/transam/xlog.c              |  7 +++----
 src/backend/access/transam/xlogreader.c        |  6 +++---
 src/backend/access/transam/xlogutils.c         | 11 ++++++-----
 src/backend/replication/logical/logicalfuncs.c |  4 ++--
 src/backend/replication/walsender.c            |  2 +-
 src/bin/pg_rewind/parsexlog.c                  |  9 ++++-----
 src/bin/pg_waldump/pg_waldump.c                |  2 +-
 src/include/access/xlogreader.h                |  8 +++-----
 src/include/access/xlogutils.h                 |  5 ++---
 src/include/replication/logicalfuncs.h         |  2 +-
 10 files changed, 26 insertions(+), 30 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 6876537b62..cd948dbefc 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -885,8 +885,7 @@ static int	XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
 						 int source, bool notfoundOk);
 static int	XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source);
 static int	XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
-						 int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
-						 TimeLineID *readTLI);
+						 int reqLen, XLogRecPtr targetRecPtr, char *readBuf);
 static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 										bool fetching_ckpt, XLogRecPtr tliRecPtr);
 static int	emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
@@ -11523,7 +11522,7 @@ CancelBackup(void)
  */
 static int
 XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
-			 XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI)
+			 XLogRecPtr targetRecPtr, char *readBuf)
 {
 	XLogPageReadPrivate *private =
 	(XLogPageReadPrivate *) xlogreader->private_data;
@@ -11640,7 +11639,7 @@ retry:
 	Assert(targetPageOff == readOff);
 	Assert(reqLen <= readLen);
 
-	*readTLI = curFileTLI;
+	xlogreader->readPageTLI = curFileTLI;
 
 	/*
 	 * Check the page header immediately, so that we can retry immediately if
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index a66e3324b1..2184f4291d 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -559,7 +559,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 
 		readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ,
 								   state->currRecPtr,
-								   state->readBuf, &state->readPageTLI);
+								   state->readBuf);
 		if (readLen < 0)
 			goto err;
 
@@ -577,7 +577,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 	 */
 	readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
 							   state->currRecPtr,
-							   state->readBuf, &state->readPageTLI);
+							   state->readBuf);
 	if (readLen < 0)
 		goto err;
 
@@ -596,7 +596,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 	{
 		readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr),
 								   state->currRecPtr,
-								   state->readBuf, &state->readPageTLI);
+								   state->readBuf);
 		if (readLen < 0)
 			goto err;
 	}
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 1fc39333f1..680bed8278 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -909,12 +909,12 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
  */
 int
 read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
-					 int reqLen, XLogRecPtr targetRecPtr, char *cur_page,
-					 TimeLineID *pageTLI)
+					 int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
 {
 	XLogRecPtr	read_upto,
 				loc;
 	int			count;
+	TimeLineID	pageTLI;
 
 	loc = targetPagePtr + reqLen;
 
@@ -934,7 +934,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 		else
 			read_upto = GetXLogReplayRecPtr(&ThisTimeLineID);
 
-		*pageTLI = ThisTimeLineID;
+		pageTLI = ThisTimeLineID;
 
 		/*
 		 * Check which timeline to get the record from.
@@ -991,7 +991,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 			 * nothing cares so long as the timeline doesn't go backwards.  We
 			 * should read the page header instead; FIXME someday.
 			 */
-			*pageTLI = state->currTLI;
+			pageTLI = state->currTLI;
 
 			/* No need to wait on a historical timeline */
 			break;
@@ -1022,8 +1022,9 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	 * as 'count', read the whole page anyway. It's guaranteed to be
 	 * zero-padded up to the page boundary if it's incomplete.
 	 */
-	XLogRead(cur_page, state->wal_segment_size, *pageTLI, targetPagePtr,
+	XLogRead(cur_page, state->wal_segment_size, pageTLI, targetPagePtr,
 			 XLOG_BLCKSZ);
+	state->readPageTLI = pageTLI;
 
 	/* number of valid bytes in the buffer */
 	return count;
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index d974400d6e..d1cf80d441 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -116,10 +116,10 @@ check_permissions(void)
 
 int
 logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
-							 int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
+							 int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
 {
 	return read_local_xlog_page(state, targetPagePtr, reqLen,
-								targetRecPtr, cur_page, pageTLI);
+								targetRecPtr, cur_page);
 }
 
 /*
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 23870a25a5..28d8c31af8 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -763,7 +763,7 @@ StartReplication(StartReplicationCmd *cmd)
  */
 static int
 logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
-					   XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
+					   XLogRecPtr targetRecPtr, char *cur_page)
 {
 	XLogRecPtr	flushptr;
 	int			count;
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 63c3879ead..0a89f9c02a 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -47,10 +47,10 @@ typedef struct XLogPageReadPrivate
 	int			tliIndex;
 } XLogPageReadPrivate;
 
+
 static int	SimpleXLogPageRead(XLogReaderState *xlogreader,
 							   XLogRecPtr targetPagePtr,
-							   int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
-							   TimeLineID *pageTLI);
+							   int reqLen, XLogRecPtr targetRecPtr, char *readBuf);
 
 /*
  * Read WAL from the datadir/pg_wal, starting from 'startpoint' on timeline
@@ -237,8 +237,7 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
 /* XLogReader callback function, to read a WAL page */
 static int
 SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
-				   int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
-				   TimeLineID *pageTLI)
+				   int reqLen, XLogRecPtr targetRecPtr, char *readBuf)
 {
 	XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data;
 	uint32		targetPageOff;
@@ -321,7 +320,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
 
 	Assert(targetSegNo == xlogreadsegno);
 
-	*pageTLI = targetHistory[private->tliIndex].tli;
+	xlogreader->readPageTLI = targetHistory[private->tliIndex].tli;
 	return XLOG_BLCKSZ;
 }
 
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index b95d467805..40c64a0bbf 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -423,7 +423,7 @@ XLogDumpXLogRead(const char *directory, TimeLineID timeline_id,
  */
 static int
 XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
-				 XLogRecPtr targetPtr, char *readBuff, TimeLineID *curFileTLI)
+				 XLogRecPtr targetPtr, char *readBuff)
 {
 	XLogDumpPrivate *private = state->private_data;
 	int			count = XLOG_BLCKSZ;
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 735b1bd2fd..d64a9ad82f 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -38,8 +38,7 @@ typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader,
 							   XLogRecPtr targetPagePtr,
 							   int reqLen,
 							   XLogRecPtr targetRecPtr,
-							   char *readBuf,
-							   TimeLineID *pageTLI);
+							   char *readBuf);
 
 typedef struct
 {
@@ -99,9 +98,8 @@ struct XLogReaderState
 	 * actual WAL record it's interested in.  In that case, targetRecPtr can
 	 * be used to determine which timeline to read the page from.
 	 *
-	 * The callback shall set *pageTLI to the TLI of the file the page was
-	 * read from.  It is currently used only for error reporting purposes, to
-	 * reconstruct the name of the WAL file where an error occurred.
+	 * The callback shall set ->readPageTLI to the TLI of the file the page
+	 * was read from.
 	 */
 	XLogPageReadCB read_page;
 
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 4105b59904..4fb305bafd 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -47,12 +47,11 @@ extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
 extern Relation CreateFakeRelcacheEntry(RelFileNode rnode);
 extern void FreeFakeRelcacheEntry(Relation fakerel);
 
+
 extern int	read_local_xlog_page(XLogReaderState *state,
 								 XLogRecPtr targetPagePtr, int reqLen,
-								 XLogRecPtr targetRecPtr, char *cur_page,
-								 TimeLineID *pageTLI);
+								 XLogRecPtr targetRecPtr, char *cur_page);
 
 extern void XLogReadDetermineTimeline(XLogReaderState *state,
 									  XLogRecPtr wantPage, uint32 wantLength);
-
 #endif
diff --git a/src/include/replication/logicalfuncs.h b/src/include/replication/logicalfuncs.h
index a9c178a9e6..012096f183 100644
--- a/src/include/replication/logicalfuncs.h
+++ b/src/include/replication/logicalfuncs.h
@@ -14,6 +14,6 @@
 extern int	logical_read_local_xlog_page(XLogReaderState *state,
 										 XLogRecPtr targetPagePtr,
 										 int reqLen, XLogRecPtr targetRecPtr,
-										 char *cur_page, TimeLineID *pageTLI);
+										 char *cur_page);
 
 #endif
-- 
2.22.0

>From a9c5a09eda9c63ab39db7336b04863bead352846 Mon Sep 17 00:00:00 2001
From: Antonin Houska <a...@cybertec.at>
Date: Mon, 9 Sep 2019 11:53:54 +0200
Subject: [PATCH 2/4] Introduce XLogSegment structure.

---
 src/backend/access/transam/xlog.c       |  6 +-
 src/backend/access/transam/xlogreader.c | 57 +++++++++++-------
 src/backend/access/transam/xlogutils.c  | 20 +++----
 src/backend/replication/walsender.c     | 78 ++++++++++++-------------
 src/bin/pg_rewind/parsexlog.c           |  2 +-
 src/bin/pg_waldump/pg_waldump.c         |  3 +
 src/include/access/xlogreader.h         | 32 ++++++----
 7 files changed, 111 insertions(+), 87 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index cd948dbefc..c5bfda74f0 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -4295,7 +4295,7 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
 			XLByteToSeg(xlogreader->latestPagePtr, segno, wal_segment_size);
 			offset = XLogSegmentOffset(xlogreader->latestPagePtr,
 									   wal_segment_size);
-			XLogFileName(fname, xlogreader->readPageTLI, segno,
+			XLogFileName(fname, xlogreader->seg.tli, segno,
 						 wal_segment_size);
 			ereport(emode_for_corrupt_record(emode,
 											 RecPtr ? RecPtr : EndRecPtr),
@@ -7354,7 +7354,7 @@ StartupXLOG(void)
 	 * and we were reading the old WAL from a segment belonging to a higher
 	 * timeline.
 	 */
-	EndOfLogTLI = xlogreader->readPageTLI;
+	EndOfLogTLI = xlogreader->seg.tli;
 
 	/*
 	 * Complain if we did not roll forward far enough to render the backup
@@ -11639,7 +11639,7 @@ retry:
 	Assert(targetPageOff == readOff);
 	Assert(reqLen <= readLen);
 
-	xlogreader->readPageTLI = curFileTLI;
+	xlogreader->seg.tli = curFileTLI;
 
 	/*
 	 * Check the page header immediately, so that we can retry immediately if
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 2184f4291d..7b4ec81493 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -96,7 +96,9 @@ XLogReaderAllocate(int wal_segment_size, XLogPageReadCB pagereadfunc,
 		return NULL;
 	}
 
-	state->wal_segment_size = wal_segment_size;
+	/* Initialize segment pointer. */
+	XLogSegmentInit(&state->seg, wal_segment_size);
+
 	state->read_page = pagereadfunc;
 	/* system_identifier initialized to zeroes above */
 	state->private_data = private_data;
@@ -490,8 +492,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 		(record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
 	{
 		/* Pretend it extends to end of segment */
-		state->EndRecPtr += state->wal_segment_size - 1;
-		state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->wal_segment_size);
+		state->EndRecPtr += state->seg.size - 1;
+		state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->seg.size);
 	}
 
 	if (DecodeXLogRecord(state, record, errormsg))
@@ -533,12 +535,12 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 
 	Assert((pageptr % XLOG_BLCKSZ) == 0);
 
-	XLByteToSeg(pageptr, targetSegNo, state->wal_segment_size);
-	targetPageOff = XLogSegmentOffset(pageptr, state->wal_segment_size);
+	XLByteToSeg(pageptr, targetSegNo, state->seg.size);
+	targetPageOff = XLogSegmentOffset(pageptr, state->seg.size);
 
 	/* check whether we have all the requested data already */
-	if (targetSegNo == state->readSegNo && targetPageOff == state->readOff &&
-		reqLen <= state->readLen)
+	if (targetSegNo == state->seg.num &&
+		targetPageOff == state->seg.off && reqLen <= state->readLen)
 		return state->readLen;
 
 	/*
@@ -553,7 +555,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 	 * record is.  This is so that we can check the additional identification
 	 * info that is present in the first page's "long" header.
 	 */
-	if (targetSegNo != state->readSegNo && targetPageOff != 0)
+	if (targetSegNo != state->seg.num && targetPageOff != 0)
 	{
 		XLogRecPtr	targetSegmentPtr = pageptr - targetPageOff;
 
@@ -608,8 +610,8 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 		goto err;
 
 	/* update read state information */
-	state->readSegNo = targetSegNo;
-	state->readOff = targetPageOff;
+	state->seg.num = targetSegNo;
+	state->seg.off = targetPageOff;
 	state->readLen = readLen;
 
 	return readLen;
@@ -625,8 +627,8 @@ err:
 static void
 XLogReaderInvalReadState(XLogReaderState *state)
 {
-	state->readSegNo = 0;
-	state->readOff = 0;
+	state->seg.num = 0;
+	state->seg.off = 0;
 	state->readLen = 0;
 }
 
@@ -745,16 +747,16 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 
 	Assert((recptr % XLOG_BLCKSZ) == 0);
 
-	XLByteToSeg(recptr, segno, state->wal_segment_size);
-	offset = XLogSegmentOffset(recptr, state->wal_segment_size);
+	XLByteToSeg(recptr, segno, state->seg.size);
+	offset = XLogSegmentOffset(recptr, state->seg.size);
 
-	XLogSegNoOffsetToRecPtr(segno, offset, state->wal_segment_size, recaddr);
+	XLogSegNoOffsetToRecPtr(segno, offset, state->seg.size, recaddr);
 
 	if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
 	{
 		char		fname[MAXFNAMELEN];
 
-		XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
+		XLogFileName(fname, state->seg.tli, segno, state->seg.size);
 
 		report_invalid_record(state,
 							  "invalid magic number %04X in log segment %s, offset %u",
@@ -768,7 +770,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 	{
 		char		fname[MAXFNAMELEN];
 
-		XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
+		XLogFileName(fname, state->seg.tli, segno, state->seg.size);
 
 		report_invalid_record(state,
 							  "invalid info bits %04X in log segment %s, offset %u",
@@ -791,7 +793,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 								  (unsigned long long) state->system_identifier);
 			return false;
 		}
-		else if (longhdr->xlp_seg_size != state->wal_segment_size)
+		else if (longhdr->xlp_seg_size != state->seg.size)
 		{
 			report_invalid_record(state,
 								  "WAL file is from different database system: incorrect segment size in page header");
@@ -808,7 +810,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 	{
 		char		fname[MAXFNAMELEN];
 
-		XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
+		XLogFileName(fname, state->seg.tli, segno, state->seg.size);
 
 		/* hmm, first page of file doesn't have a long header? */
 		report_invalid_record(state,
@@ -828,7 +830,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 	{
 		char		fname[MAXFNAMELEN];
 
-		XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
+		XLogFileName(fname, state->seg.tli, segno, state->seg.size);
 
 		report_invalid_record(state,
 							  "unexpected pageaddr %X/%X in log segment %s, offset %u",
@@ -853,7 +855,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 		{
 			char		fname[MAXFNAMELEN];
 
-			XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
+			XLogFileName(fname, state->seg.tli, segno, state->seg.size);
 
 			report_invalid_record(state,
 								  "out-of-sequence timeline ID %u (after %u) in log segment %s, offset %u",
@@ -997,6 +999,19 @@ out:
 
 #endif							/* FRONTEND */
 
+/*
+ * Initialize the passed segment pointer.
+ */
+void
+XLogSegmentInit(XLogSegment *seg, int size)
+{
+	seg->file = -1;
+	seg->num = 0;
+	seg->off = 0;
+	seg->tli = 0;
+	seg->dir = NULL;
+	seg->size = size;
+}
 
 /* ----------------------------------------
  * Functions for decoding the data and block references in a record.
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 680bed8278..424bb06919 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -802,8 +802,8 @@ XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr,
 void
 XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
 {
-	const XLogRecPtr lastReadPage = state->readSegNo *
-	state->wal_segment_size + state->readOff;
+	const XLogRecPtr lastReadPage = state->seg.num *
+	state->seg.size + state->seg.off;
 
 	Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0);
 	Assert(wantLength <= XLOG_BLCKSZ);
@@ -847,8 +847,8 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
 	if (state->currTLIValidUntil != InvalidXLogRecPtr &&
 		state->currTLI != ThisTimeLineID &&
 		state->currTLI != 0 &&
-		((wantPage + wantLength) / state->wal_segment_size) <
-		(state->currTLIValidUntil / state->wal_segment_size))
+		((wantPage + wantLength) / state->seg.size) <
+		(state->currTLIValidUntil / state->seg.size))
 		return;
 
 	/*
@@ -870,11 +870,11 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
 		 */
 		List	   *timelineHistory = readTimeLineHistory(ThisTimeLineID);
 
-		XLogRecPtr	endOfSegment = (((wantPage / state->wal_segment_size) + 1)
-									* state->wal_segment_size) - 1;
+		XLogRecPtr	endOfSegment = (((wantPage / state->seg.size) + 1)
+									* state->seg.size) - 1;
 
-		Assert(wantPage / state->wal_segment_size ==
-			   endOfSegment / state->wal_segment_size);
+		Assert(wantPage / state->seg.size ==
+			   endOfSegment / state->seg.size);
 
 		/*
 		 * Find the timeline of the last LSN on the segment containing
@@ -1022,9 +1022,9 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	 * as 'count', read the whole page anyway. It's guaranteed to be
 	 * zero-padded up to the page boundary if it's incomplete.
 	 */
-	XLogRead(cur_page, state->wal_segment_size, pageTLI, targetPagePtr,
+	XLogRead(cur_page, state->seg.size, state->seg.tli, targetPagePtr,
 			 XLOG_BLCKSZ);
-	state->readPageTLI = pageTLI;
+	state->seg.tli = pageTLI;
 
 	/* number of valid bytes in the buffer */
 	return count;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 28d8c31af8..f5630a63cf 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -128,16 +128,7 @@ bool		log_replication_commands = false;
  */
 bool		wake_wal_senders = false;
 
-/*
- * These variables are used similarly to openLogFile/SegNo/Off,
- * but for walsender to read the XLOG.
- */
-static int	sendFile = -1;
-static XLogSegNo sendSegNo = 0;
-static uint32 sendOff = 0;
-
-/* Timeline ID of the currently open file */
-static TimeLineID curFileTimeLine = 0;
+static XLogSegment *sendSeg = NULL;
 
 /*
  * These variables keep track of the state of the timeline we're currently
@@ -285,6 +276,10 @@ InitWalSender(void)
 
 	/* Initialize empty timestamp buffer for lag tracking. */
 	lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker));
+
+	/* Make sure we can remember the current read position in XLOG. */
+	sendSeg = (XLogSegment *) MemoryContextAlloc(TopMemoryContext, sizeof(XLogSegment));
+	XLogSegmentInit(sendSeg, wal_segment_size);
 }
 
 /*
@@ -301,10 +296,10 @@ WalSndErrorCleanup(void)
 	ConditionVariableCancelSleep();
 	pgstat_report_wait_end();
 
-	if (sendFile >= 0)
+	if (sendSeg->file >= 0)
 	{
-		close(sendFile);
-		sendFile = -1;
+		close(sendSeg->file);
+		sendSeg->file = -1;
 	}
 
 	if (MyReplicationSlot != NULL)
@@ -2384,15 +2379,16 @@ retry:
 
 		startoff = XLogSegmentOffset(recptr, wal_segment_size);
 
-		if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, wal_segment_size))
+		if (sendSeg->file < 0 ||
+			!XLByteInSeg(recptr, sendSeg->num, sendSeg->size))
 		{
 			char		path[MAXPGPATH];
 
 			/* Switch to another logfile segment */
-			if (sendFile >= 0)
-				close(sendFile);
+			if (sendSeg->file >= 0)
+				close(sendSeg->file);
 
-			XLByteToSeg(recptr, sendSegNo, wal_segment_size);
+			XLByteToSeg(recptr, sendSeg->num, sendSeg->size);
 
 			/*-------
 			 * When reading from a historic timeline, and there is a timeline
@@ -2420,20 +2416,20 @@ retry:
 			 * used portion of the old segment is copied to the new file.
 			 *-------
 			 */
-			curFileTimeLine = sendTimeLine;
+			sendSeg->tli = sendTimeLine;
 			if (sendTimeLineIsHistoric)
 			{
 				XLogSegNo	endSegNo;
 
 				XLByteToSeg(sendTimeLineValidUpto, endSegNo, wal_segment_size);
-				if (sendSegNo == endSegNo)
-					curFileTimeLine = sendTimeLineNextTLI;
+				if (sendSeg->num == endSegNo)
+					sendSeg->tli = sendTimeLineNextTLI;
 			}
 
-			XLogFilePath(path, curFileTimeLine, sendSegNo, wal_segment_size);
+			XLogFilePath(path, sendSeg->tli, sendSeg->num, wal_segment_size);
 
-			sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY);
-			if (sendFile < 0)
+			sendSeg->file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
+			if (sendSeg->file < 0)
 			{
 				/*
 				 * If the file is not found, assume it's because the standby
@@ -2444,26 +2440,26 @@ retry:
 					ereport(ERROR,
 							(errcode_for_file_access(),
 							 errmsg("requested WAL segment %s has already been removed",
-									XLogFileNameP(curFileTimeLine, sendSegNo))));
+									XLogFileNameP(sendSeg->tli, sendSeg->num))));
 				else
 					ereport(ERROR,
 							(errcode_for_file_access(),
 							 errmsg("could not open file \"%s\": %m",
 									path)));
 			}
-			sendOff = 0;
+			sendSeg->off = 0;
 		}
 
 		/* Need to seek in the file? */
-		if (sendOff != startoff)
+		if (sendSeg->off != startoff)
 		{
-			if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
+			if (lseek(sendSeg->file, (off_t) startoff, SEEK_SET) < 0)
 				ereport(ERROR,
 						(errcode_for_file_access(),
 						 errmsg("could not seek in log segment %s to offset %u: %m",
-								XLogFileNameP(curFileTimeLine, sendSegNo),
+								XLogFileNameP(sendSeg->tli, sendSeg->num),
 								startoff)));
-			sendOff = startoff;
+			sendSeg->off = startoff;
 		}
 
 		/* How many bytes are within this segment? */
@@ -2473,29 +2469,29 @@ retry:
 			segbytes = nbytes;
 
 		pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
-		readbytes = read(sendFile, p, segbytes);
+		readbytes = read(sendSeg->file, p, segbytes);
 		pgstat_report_wait_end();
 		if (readbytes < 0)
 		{
 			ereport(ERROR,
 					(errcode_for_file_access(),
 					 errmsg("could not read from log segment %s, offset %u, length %zu: %m",
-							XLogFileNameP(curFileTimeLine, sendSegNo),
-							sendOff, (Size) segbytes)));
+							XLogFileNameP(sendSeg->tli, sendSeg->num),
+							sendSeg->off, (Size) segbytes)));
 		}
 		else if (readbytes == 0)
 		{
 			ereport(ERROR,
 					(errcode(ERRCODE_DATA_CORRUPTED),
 					 errmsg("could not read from log segment %s, offset %u: read %d of %zu",
-							XLogFileNameP(curFileTimeLine, sendSegNo),
-							sendOff, readbytes, (Size) segbytes)));
+							XLogFileNameP(sendSeg->tli, sendSeg->num),
+							sendSeg->off, readbytes, (Size) segbytes)));
 		}
 
 		/* Update state for read */
 		recptr += readbytes;
 
-		sendOff += readbytes;
+		sendSeg->off += readbytes;
 		nbytes -= readbytes;
 		p += readbytes;
 	}
@@ -2526,10 +2522,10 @@ retry:
 		walsnd->needreload = false;
 		SpinLockRelease(&walsnd->mutex);
 
-		if (reload && sendFile >= 0)
+		if (reload && sendSeg->file >= 0)
 		{
-			close(sendFile);
-			sendFile = -1;
+			close(sendSeg->file);
+			sendSeg->file = -1;
 
 			goto retry;
 		}
@@ -2695,9 +2691,9 @@ XLogSendPhysical(void)
 	if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
 	{
 		/* close the current file. */
-		if (sendFile >= 0)
-			close(sendFile);
-		sendFile = -1;
+		if (sendSeg->file >= 0)
+			close(sendSeg->file);
+		sendSeg->file = -1;
 
 		/* Send CopyDone */
 		pq_putmessage_noblock('c', NULL, 0);
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 0a89f9c02a..e663fec6a7 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -320,7 +320,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
 
 	Assert(targetSegNo == xlogreadsegno);
 
-	xlogreader->readPageTLI = targetHistory[private->tliIndex].tli;
+	xlogreader->seg.tli = targetHistory[private->tliIndex].tli;
 	return XLOG_BLCKSZ;
 }
 
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index 40c64a0bbf..a16793bb8b 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -1105,6 +1105,9 @@ main(int argc, char **argv)
 	if (!xlogreader_state)
 		fatal_error("out of memory");
 
+	/* Finalize the segment pointer. */
+	xlogreader_state->seg.dir = private.inpath;
+
 	/* first find a valid recptr to start from */
 	first_record = XLogFindNextRecord(xlogreader_state, private.startptr);
 
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index d64a9ad82f..c2724fff74 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -31,6 +31,20 @@
 
 #include "access/xlogrecord.h"
 
+/*
+ * Position in XLOG file while reading it.
+ */
+typedef struct XLogSegment
+{
+	int			file;			/* segment file descriptor */
+	XLogSegNo	num;			/* segment number */
+	uint32		off;			/* offset in the segment */
+	TimeLineID	tli;			/* timeline ID of the currently open file */
+
+	char	   *dir;			/* directory (only needed by frontends) */
+	int			size;			/* segment size */
+} XLogSegment;
+
 typedef struct XLogReaderState XLogReaderState;
 
 /* Function type definition for the read_page callback */
@@ -76,11 +90,6 @@ struct XLogReaderState
 	 * ----------------------------------------
 	 */
 
-	/*
-	 * Segment size of the to-be-parsed data (mandatory).
-	 */
-	int			wal_segment_size;
-
 	/*
 	 * Data input callback (mandatory).
 	 *
@@ -98,8 +107,8 @@ struct XLogReaderState
 	 * actual WAL record it's interested in.  In that case, targetRecPtr can
 	 * be used to determine which timeline to read the page from.
 	 *
-	 * The callback shall set ->readPageTLI to the TLI of the file the page
-	 * was read from.
+	 * The callback shall set ->seg.tli to the TLI of the file the page was
+	 * read from.
 	 */
 	XLogPageReadCB read_page;
 
@@ -154,10 +163,8 @@ struct XLogReaderState
 	char	   *readBuf;
 	uint32		readLen;
 
-	/* last read segment, segment offset, TLI for data currently in readBuf */
-	XLogSegNo	readSegNo;
-	uint32		readOff;
-	TimeLineID	readPageTLI;
+	/* last read XLOG position for data currently in readBuf */
+	XLogSegment seg;
 
 	/*
 	 * beginning of prior page read, and its TLI.  Doesn't necessarily
@@ -217,6 +224,9 @@ extern bool XLogReaderValidatePageHeader(XLogReaderState *state,
 #ifdef FRONTEND
 extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr);
 #endif							/* FRONTEND */
+
+extern void XLogSegmentInit(XLogSegment *seg, int size);
+
 /* Functions for decoding an XLogRecord */
 
 extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record,
-- 
2.22.0

>From 01716ffe400dc61cbd37acdca4c03a79b0b4e117 Mon Sep 17 00:00:00 2001
From: Antonin Houska <a...@cybertec.at>
Date: Mon, 9 Sep 2019 11:53:55 +0200
Subject: [PATCH 3/4] Use only xlogreader.c:XLogRead()

The implementations in xlogutils.c and walsender.c are just renamed now, to be
removed by the following diff.
---
 src/backend/access/transam/xlogreader.c | 128 ++++++++++++++++++++++++
 src/backend/access/transam/xlogutils.c  |  40 ++++++--
 src/backend/replication/walsender.c     | 125 ++++++++++++++++++++++-
 src/bin/pg_waldump/pg_waldump.c         |  59 ++++++++++-
 src/include/access/xlogreader.h         |  19 ++++
 5 files changed, 359 insertions(+), 12 deletions(-)

diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 7b4ec81493..2a80bc5823 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -17,6 +17,8 @@
  */
 #include "postgres.h"
 
+#include <unistd.h>
+
 #include "access/transam.h"
 #include "access/xlogrecord.h"
 #include "access/xlog_internal.h"
@@ -27,6 +29,7 @@
 
 #ifndef FRONTEND
 #include "miscadmin.h"
+#include "pgstat.h"
 #include "utils/memutils.h"
 #endif
 
@@ -1011,7 +1014,132 @@ XLogSegmentInit(XLogSegment *seg, int size)
 	seg->tli = 0;
 	seg->dir = NULL;
 	seg->size = size;
+	seg->last_req = 0;
+}
+
+/*
+ * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'. If
+ * tli is passed, get the data from timeline *tli. 'pos' is the current
+ * position in the XLOG file and openSegment is a callback that opens the next
+ * segment for reading.
+ *
+ * Returns true if the call succeeded, false if it failed. Caller should check
+ * errno in the case of failure. seg->last_req might also be useful for error
+ * messages.
+ *
+ * XXX probably this should be improved to suck data directly from the
+ * WAL buffers when possible.
+ */
+bool
+XLogRead(char *buf, XLogRecPtr startptr, Size count,
+		 TimeLineID *tli, XLogSegment *seg, XLogOpenSegment openSegment)
+{
+	char	   *p;
+	XLogRecPtr	recptr;
+	Size		nbytes;
+
+	p = buf;
+	recptr = startptr;
+	nbytes = count;
+
+	while (nbytes > 0)
+	{
+		int			readbytes;
+
+		seg->off = XLogSegmentOffset(recptr, seg->size);
+
+		if (seg->file < 0 ||
+			!XLByteInSeg(recptr, seg->num, seg->size) ||
+			(tli != NULL && *tli != seg->tli))
+		{
+			XLogSegNo	nextSegNo;
+
+			/* Switch to another logfile segment */
+			if (seg->file >= 0)
+				close(seg->file);
+
+			XLByteToSeg(recptr, nextSegNo, seg->size);
+
+			/* Open the next segment in the caller's way. */
+			openSegment(nextSegNo, tli, seg);
+
+			/*
+			 * If the function is called by the XLOG reader, the reader will
+			 * eventually set both "num" and "off". However we need to care
+			 * about them too because the function can also be used directly,
+			 * see walsender.c.
+			 */
+			seg->num = nextSegNo;
+			seg->off = 0;
+		}
+
+		/* How many bytes are within this segment? */
+		if (nbytes > (seg->size - seg->off))
+			seg->last_req = seg->size - seg->off;
+		else
+			seg->last_req = nbytes;
+
+#ifndef FRONTEND
+		pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
+#endif
+
+		/*
+		 * Failure to read the data does not necessarily imply non-zero errno.
+		 * Set it to zero so that caller can distinguish the failure that does
+		 * not affect errno.
+		 */
+		errno = 0;
+
+		readbytes = pg_pread(seg->file, p, seg->last_req, seg->off);
+
+#ifndef FRONTEND
+		pgstat_report_wait_end();
+#endif
+
+		if (readbytes <= 0)
+			return false;
+
+		/* Update state for read */
+		recptr += readbytes;
+		nbytes -= readbytes;
+		p += readbytes;
+
+		/*
+		 * If the function is called by the XLOG reader, the reader will
+		 * eventually set this field. However we need to care about it too
+		 * because the function can also be used directly (see walsender.c).
+		 */
+		seg->off += readbytes;
+	}
+
+	return true;
+}
+
+#ifndef FRONTEND
+/*
+ * Backend-specific code to handle errors encountered by XLogRead().
+ */
+void
+XLogReadProcessError(XLogSegment *seg)
+{
+	if (errno != 0)
+	{
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not read from log segment %s, offset %u, length %zu: %m",
+						XLogFileNameP(seg->tli, seg->num), seg->off,
+						(Size) seg->last_req)));
+	}
+	else
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_DATA_CORRUPTED),
+				 errmsg("could not read from log segment %s, offset %u: length %zu",
+						XLogFileNameP(seg->tli, seg->num), seg->off,
+						(Size) seg->last_req)));
+	}
 }
+#endif
 
 /* ----------------------------------------
  * Functions for decoding the data and block references in a record.
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 424bb06919..83b014e04c 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -653,8 +653,8 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
  * frontend).  Probably these should be merged at some point.
  */
 static void
-XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr,
-		 Size count)
+XLogReadOld(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr,
+			Size count)
 {
 	char	   *p;
 	XLogRecPtr	recptr;
@@ -896,6 +896,35 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
 	}
 }
 
+/*
+ * Callback for XLogRead() to open the next segment.
+ */
+static void
+read_local_xlog_page_open_segment(XLogSegNo nextSegNo, TimeLineID *tli,
+								  XLogSegment *seg)
+{
+	char		path[MAXPGPATH];
+
+	XLogFilePath(path, *tli, nextSegNo, seg->size);
+	seg->file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
+
+	if (seg->file < 0)
+	{
+		if (errno == ENOENT)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("requested WAL segment %s has already been removed",
+							path)));
+		else
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not open file \"%s\": %m",
+							path)));
+	}
+
+	seg->tli = *tli;
+}
+
 /*
  * read_page callback for reading local xlog files
  *
@@ -1022,10 +1051,9 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	 * as 'count', read the whole page anyway. It's guaranteed to be
 	 * zero-padded up to the page boundary if it's incomplete.
 	 */
-	XLogRead(cur_page, state->seg.size, state->seg.tli, targetPagePtr,
-			 XLOG_BLCKSZ);
-	state->seg.tli = pageTLI;
-
+	if (!XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ, &pageTLI,
+				  &state->seg, read_local_xlog_page_open_segment))
+		XLogReadProcessError(&state->seg);
 	/* number of valid bytes in the buffer */
 	return count;
 }
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index f5630a63cf..0685c320b4 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -247,7 +247,9 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
 static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
-static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
+static void WalSndOpenSegment(XLogSegNo nextSegNo, TimeLineID *tli,
+							  XLogSegment *seg);
+static void XLogReadOld(char *buf, XLogRecPtr startptr, Size count);
 
 
 /* Initialize walsender process before entering the main command loop */
@@ -782,7 +784,9 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 		count = flushptr - targetPagePtr;	/* part of the page available */
 
 	/* now actually read the data, we know it's there */
-	XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
+	if (!XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ, NULL, sendSeg,
+				  WalSndOpenSegment))
+		XLogReadProcessError(sendSeg);
 
 	return count;
 }
@@ -2359,7 +2363,7 @@ WalSndKill(int code, Datum arg)
  * more than one.
  */
 static void
-XLogRead(char *buf, XLogRecPtr startptr, Size count)
+XLogReadOld(char *buf, XLogRecPtr startptr, Size count)
 {
 	char	   *p;
 	XLogRecPtr	recptr;
@@ -2532,6 +2536,76 @@ retry:
 	}
 }
 
+/*
+ * Callback for XLogRead() to open the next segment.
+ */
+void
+WalSndOpenSegment(XLogSegNo nextSegNo, TimeLineID *tli, XLogSegment *seg)
+{
+	char		path[MAXPGPATH];
+
+	/*
+	 * The timeline is determined below, caller should not do anything about
+	 * it.
+	 */
+	Assert(tli == NULL);
+
+	/*-------
+	 * When reading from a historic timeline, and there is a timeline switch
+	 * within this segment, read from the WAL segment belonging to the new
+	 * timeline.
+	 *
+	 * For example, imagine that this server is currently on timeline 5, and
+	 * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
+	 * 0/13002088. In pg_wal, we have these files:
+	 *
+	 * ...
+	 * 000000040000000000000012
+	 * 000000040000000000000013
+	 * 000000050000000000000013
+	 * 000000050000000000000014
+	 * ...
+	 *
+	 * In this situation, when requested to send the WAL from segment 0x13, on
+	 * timeline 4, we read the WAL from file 000000050000000000000013. Archive
+	 * recovery prefers files from newer timelines, so if the segment was
+	 * restored from the archive on this server, the file belonging to the old
+	 * timeline, 000000040000000000000013, might not exist. Their contents are
+	 * equal up to the switchpoint, because at a timeline switch, the used
+	 * portion of the old segment is copied to the new file.  -------
+	 */
+	seg->tli = sendTimeLine;
+	if (sendTimeLineIsHistoric)
+	{
+		XLogSegNo	endSegNo;
+
+		XLByteToSeg(sendTimeLineValidUpto, endSegNo, seg->size);
+		if (seg->num == endSegNo)
+			seg->tli = sendTimeLineNextTLI;
+	}
+
+	XLogFilePath(path, seg->tli, nextSegNo, seg->size);
+	seg->file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
+
+	if (seg->file < 0)
+	{
+		/*
+		 * If the file is not found, assume it's because the standby asked for
+		 * a too old WAL segment that has already been removed or recycled.
+		 */
+		if (errno == ENOENT)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("requested WAL segment %s has already been removed",
+							XLogFileNameP(seg->tli, seg->num))));
+		else
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not open file \"%s\": %m",
+							path)));
+	}
+}
+
 /*
  * Send out the WAL in its normal physical/stored form.
  *
@@ -2549,6 +2623,7 @@ XLogSendPhysical(void)
 	XLogRecPtr	startptr;
 	XLogRecPtr	endptr;
 	Size		nbytes;
+	XLogSegNo	segno;
 
 	/* If requested switch the WAL sender to the stopping state. */
 	if (got_STOPPING)
@@ -2764,7 +2839,49 @@ XLogSendPhysical(void)
 	 * calls.
 	 */
 	enlargeStringInfo(&output_message, nbytes);
-	XLogRead(&output_message.data[output_message.len], startptr, nbytes);
+
+retry:
+	if (!XLogRead(&output_message.data[output_message.len], startptr, nbytes,
+				  NULL,			/* WalSndOpenSegment will determine TLI */
+				  sendSeg,
+				  WalSndOpenSegment))
+		XLogReadProcessError(sendSeg);
+
+	/*
+	 * After reading into the buffer, check that what we read was valid. We do
+	 * this after reading, because even though the segment was present when we
+	 * opened it, it might get recycled or removed while we read it. The
+	 * read() succeeds in that case, but the data we tried to read might
+	 * already have been overwritten with new WAL records.
+	 */
+	XLByteToSeg(startptr, segno, wal_segment_size);
+	CheckXLogRemoved(segno, ThisTimeLineID);
+
+	/*
+	 * During recovery, the currently-open WAL file might be replaced with the
+	 * file of the same name retrieved from archive. So we always need to
+	 * check what we read was valid after reading into the buffer. If it's
+	 * invalid, we try to open and read the file again.
+	 */
+	if (am_cascading_walsender)
+	{
+		WalSnd	   *walsnd = MyWalSnd;
+		bool		reload;
+
+		SpinLockAcquire(&walsnd->mutex);
+		reload = walsnd->needreload;
+		walsnd->needreload = false;
+		SpinLockRelease(&walsnd->mutex);
+
+		if (reload && sendSeg->file >= 0)
+		{
+			close(sendSeg->file);
+			sendSeg->file = -1;
+
+			goto retry;
+		}
+	}
+
 	output_message.len += nbytes;
 	output_message.data[output_message.len] = '\0';
 
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index a16793bb8b..3e09519f88 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -296,6 +296,45 @@ identify_target_directory(XLogDumpPrivate *private, char *directory,
 		fatal_error("could not find any WAL file");
 }
 
+static void
+XLogDumpOpenSegment(XLogSegNo nextSegNo, TimeLineID *tli, XLogSegment *seg)
+{
+	char		fname[MAXPGPATH];
+	int			tries;
+
+	XLogFileName(fname, *tli, nextSegNo, seg->size);
+
+	/*
+	 * In follow mode there is a short period of time after the server has
+	 * written the end of the previous file before the new file is available.
+	 * So we loop for 5 seconds looking for the file to appear before giving
+	 * up.
+	 */
+	for (tries = 0; tries < 10; tries++)
+	{
+		seg->file = open_file_in_directory(seg->dir, fname);
+		if (seg->file >= 0)
+			break;
+		if (errno == ENOENT)
+		{
+			int			save_errno = errno;
+
+			/* File not there yet, try again */
+			pg_usleep(500 * 1000);
+
+			errno = save_errno;
+			continue;
+		}
+		/* Any other error, fall through and fail */
+		break;
+	}
+
+	if (seg->file < 0)
+		fatal_error("could not find file \"%s\": %s",
+					fname, strerror(errno));
+	seg->tli = *tli;
+}
+
 /*
  * Read count bytes from a segment file in the specified directory, for the
  * given timeline, containing the specified record pointer; store the data in
@@ -441,8 +480,24 @@ XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
 		}
 	}
 
-	XLogDumpXLogRead(private->inpath, private->timeline, targetPagePtr,
-					 readBuff, count);
+	if (!XLogRead(readBuff, targetPagePtr, count, &private->timeline,
+				  &state->seg, XLogDumpOpenSegment))
+	{
+		XLogSegment *seg = &state->seg;
+		int			err = errno;
+		char		fname[MAXPGPATH];
+		int			save_errno = errno;
+
+		XLogFileName(fname, seg->tli, seg->num, seg->size);
+		errno = save_errno;
+
+		if (errno != 0)
+			fatal_error("could not read from log file %s, offset %u, length %zu: %s",
+						fname, seg->off, (Size) seg->last_req, strerror(err));
+		else
+			fatal_error("could not read from log file %s, offset %u: length: %zu",
+						fname, seg->off, (Size) seg->last_req);
+	}
 
 	return count;
 }
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index c2724fff74..4731023ccc 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -43,6 +43,7 @@ typedef struct XLogSegment
 
 	char	   *dir;			/* directory (only needed by frontends) */
 	int			size;			/* segment size */
+	int			last_req;		/* the amount of data requested last time */
 } XLogSegment;
 
 typedef struct XLogReaderState XLogReaderState;
@@ -225,7 +226,25 @@ extern bool XLogReaderValidatePageHeader(XLogReaderState *state,
 extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr);
 #endif							/* FRONTEND */
 
+/*
+ * Callback to open the specified XLOG segment nextSegNo in timeline *tli for
+ * reading, and assign the descriptor to ->file. BasicOpenFile() is the
+ * preferred way to open the segment file in backend code, whereas open(2)
+ * should be used in frontend.
+ *
+ * If NULL is passed for tli, the callback must determine the timeline
+ * itself. In any case it's supposed to eventually set ->tli.
+ */
+typedef void (*XLogOpenSegment) (XLogSegNo nextSegNo, TimeLineID *tli,
+								 XLogSegment *seg);
+
 extern void XLogSegmentInit(XLogSegment *seg, int size);
+extern bool XLogRead(char *buf, XLogRecPtr startptr, Size count,
+					 TimeLineID *tli, XLogSegment *seg,
+					 XLogOpenSegment openSegment);
+#ifndef FRONTEND
+void		XLogReadProcessError(XLogSegment *seg);
+#endif
 
 /* Functions for decoding an XLogRecord */
 
-- 
2.22.0

>From e926fb9d4b28c630693eb80123a82d5cefb06920 Mon Sep 17 00:00:00 2001
From: Antonin Houska <a...@cybertec.at>
Date: Mon, 9 Sep 2019 11:53:55 +0200
Subject: [PATCH 4/4] Remove the old implemenations of XLogRead().

Done in a separate patch because the diff looks harder to read if one function
(XLogRead) is removed and another one (the XLogOpenSegment callback) is added
nearby at the same time (the addition and removal of code can get mixed in the
diff).
---
 src/backend/access/transam/xlogutils.c | 125 -----------------
 src/backend/replication/walsender.c    | 186 -------------------------
 src/bin/pg_waldump/pg_waldump.c        | 123 ----------------
 3 files changed, 434 deletions(-)

diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 83b014e04c..5b0ba39f17 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -17,14 +17,11 @@
  */
 #include "postgres.h"
 
-#include <unistd.h>
-
 #include "access/timeline.h"
 #include "access/xlog.h"
 #include "access/xlog_internal.h"
 #include "access/xlogutils.h"
 #include "miscadmin.h"
-#include "pgstat.h"
 #include "storage/smgr.h"
 #include "utils/guc.h"
 #include "utils/hsearch.h"
@@ -639,128 +636,6 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
 	forget_invalid_pages(rnode, forkNum, nblocks);
 }
 
-/*
- * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
- * in timeline 'tli'.
- *
- * Will open, and keep open, one WAL segment stored in the static file
- * descriptor 'sendFile'. This means if XLogRead is used once, there will
- * always be one descriptor left open until the process ends, but never
- * more than one.
- *
- * XXX This is very similar to pg_waldump's XLogDumpXLogRead and to XLogRead
- * in walsender.c but for small differences (such as lack of elog() in
- * frontend).  Probably these should be merged at some point.
- */
-static void
-XLogReadOld(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr,
-			Size count)
-{
-	char	   *p;
-	XLogRecPtr	recptr;
-	Size		nbytes;
-
-	/* state maintained across calls */
-	static int	sendFile = -1;
-	static XLogSegNo sendSegNo = 0;
-	static TimeLineID sendTLI = 0;
-	static uint32 sendOff = 0;
-
-	Assert(segsize == wal_segment_size);
-
-	p = buf;
-	recptr = startptr;
-	nbytes = count;
-
-	while (nbytes > 0)
-	{
-		uint32		startoff;
-		int			segbytes;
-		int			readbytes;
-
-		startoff = XLogSegmentOffset(recptr, segsize);
-
-		/* Do we need to switch to a different xlog segment? */
-		if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, segsize) ||
-			sendTLI != tli)
-		{
-			char		path[MAXPGPATH];
-
-			if (sendFile >= 0)
-				close(sendFile);
-
-			XLByteToSeg(recptr, sendSegNo, segsize);
-
-			XLogFilePath(path, tli, sendSegNo, segsize);
-
-			sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY);
-
-			if (sendFile < 0)
-			{
-				if (errno == ENOENT)
-					ereport(ERROR,
-							(errcode_for_file_access(),
-							 errmsg("requested WAL segment %s has already been removed",
-									path)));
-				else
-					ereport(ERROR,
-							(errcode_for_file_access(),
-							 errmsg("could not open file \"%s\": %m",
-									path)));
-			}
-			sendOff = 0;
-			sendTLI = tli;
-		}
-
-		/* Need to seek in the file? */
-		if (sendOff != startoff)
-		{
-			if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
-			{
-				char		path[MAXPGPATH];
-				int			save_errno = errno;
-
-				XLogFilePath(path, tli, sendSegNo, segsize);
-				errno = save_errno;
-				ereport(ERROR,
-						(errcode_for_file_access(),
-						 errmsg("could not seek in log segment %s to offset %u: %m",
-								path, startoff)));
-			}
-			sendOff = startoff;
-		}
-
-		/* How many bytes are within this segment? */
-		if (nbytes > (segsize - startoff))
-			segbytes = segsize - startoff;
-		else
-			segbytes = nbytes;
-
-		pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
-		readbytes = read(sendFile, p, segbytes);
-		pgstat_report_wait_end();
-		if (readbytes <= 0)
-		{
-			char		path[MAXPGPATH];
-			int			save_errno = errno;
-
-			XLogFilePath(path, tli, sendSegNo, segsize);
-			errno = save_errno;
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not read from log segment %s, offset %u, length %lu: %m",
-							path, sendOff, (unsigned long) segbytes)));
-		}
-
-		/* Update state for read */
-		recptr += readbytes;
-
-		sendOff += readbytes;
-		nbytes -= readbytes;
-		p += readbytes;
-	}
-}
-
 /*
  * Determine which timeline to read an xlog page from and set the
  * XLogReaderState's currTLI to that timeline ID.
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 0685c320b4..072d46d853 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -249,7 +249,6 @@ static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
 static void WalSndOpenSegment(XLogSegNo nextSegNo, TimeLineID *tli,
 							  XLogSegment *seg);
-static void XLogReadOld(char *buf, XLogRecPtr startptr, Size count);
 
 
 /* Initialize walsender process before entering the main command loop */
@@ -2351,191 +2350,6 @@ WalSndKill(int code, Datum arg)
 	SpinLockRelease(&walsnd->mutex);
 }
 
-/*
- * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
- *
- * XXX probably this should be improved to suck data directly from the
- * WAL buffers when possible.
- *
- * Will open, and keep open, one WAL segment stored in the global file
- * descriptor sendFile. This means if XLogRead is used once, there will
- * always be one descriptor left open until the process ends, but never
- * more than one.
- */
-static void
-XLogReadOld(char *buf, XLogRecPtr startptr, Size count)
-{
-	char	   *p;
-	XLogRecPtr	recptr;
-	Size		nbytes;
-	XLogSegNo	segno;
-
-retry:
-	p = buf;
-	recptr = startptr;
-	nbytes = count;
-
-	while (nbytes > 0)
-	{
-		uint32		startoff;
-		int			segbytes;
-		int			readbytes;
-
-		startoff = XLogSegmentOffset(recptr, wal_segment_size);
-
-		if (sendSeg->file < 0 ||
-			!XLByteInSeg(recptr, sendSeg->num, sendSeg->size))
-		{
-			char		path[MAXPGPATH];
-
-			/* Switch to another logfile segment */
-			if (sendSeg->file >= 0)
-				close(sendSeg->file);
-
-			XLByteToSeg(recptr, sendSeg->num, sendSeg->size);
-
-			/*-------
-			 * When reading from a historic timeline, and there is a timeline
-			 * switch within this segment, read from the WAL segment belonging
-			 * to the new timeline.
-			 *
-			 * For example, imagine that this server is currently on timeline
-			 * 5, and we're streaming timeline 4. The switch from timeline 4
-			 * to 5 happened at 0/13002088. In pg_wal, we have these files:
-			 *
-			 * ...
-			 * 000000040000000000000012
-			 * 000000040000000000000013
-			 * 000000050000000000000013
-			 * 000000050000000000000014
-			 * ...
-			 *
-			 * In this situation, when requested to send the WAL from
-			 * segment 0x13, on timeline 4, we read the WAL from file
-			 * 000000050000000000000013. Archive recovery prefers files from
-			 * newer timelines, so if the segment was restored from the
-			 * archive on this server, the file belonging to the old timeline,
-			 * 000000040000000000000013, might not exist. Their contents are
-			 * equal up to the switchpoint, because at a timeline switch, the
-			 * used portion of the old segment is copied to the new file.
-			 *-------
-			 */
-			sendSeg->tli = sendTimeLine;
-			if (sendTimeLineIsHistoric)
-			{
-				XLogSegNo	endSegNo;
-
-				XLByteToSeg(sendTimeLineValidUpto, endSegNo, wal_segment_size);
-				if (sendSeg->num == endSegNo)
-					sendSeg->tli = sendTimeLineNextTLI;
-			}
-
-			XLogFilePath(path, sendSeg->tli, sendSeg->num, wal_segment_size);
-
-			sendSeg->file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
-			if (sendSeg->file < 0)
-			{
-				/*
-				 * If the file is not found, assume it's because the standby
-				 * asked for a too old WAL segment that has already been
-				 * removed or recycled.
-				 */
-				if (errno == ENOENT)
-					ereport(ERROR,
-							(errcode_for_file_access(),
-							 errmsg("requested WAL segment %s has already been removed",
-									XLogFileNameP(sendSeg->tli, sendSeg->num))));
-				else
-					ereport(ERROR,
-							(errcode_for_file_access(),
-							 errmsg("could not open file \"%s\": %m",
-									path)));
-			}
-			sendSeg->off = 0;
-		}
-
-		/* Need to seek in the file? */
-		if (sendSeg->off != startoff)
-		{
-			if (lseek(sendSeg->file, (off_t) startoff, SEEK_SET) < 0)
-				ereport(ERROR,
-						(errcode_for_file_access(),
-						 errmsg("could not seek in log segment %s to offset %u: %m",
-								XLogFileNameP(sendSeg->tli, sendSeg->num),
-								startoff)));
-			sendSeg->off = startoff;
-		}
-
-		/* How many bytes are within this segment? */
-		if (nbytes > (wal_segment_size - startoff))
-			segbytes = wal_segment_size - startoff;
-		else
-			segbytes = nbytes;
-
-		pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
-		readbytes = read(sendSeg->file, p, segbytes);
-		pgstat_report_wait_end();
-		if (readbytes < 0)
-		{
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not read from log segment %s, offset %u, length %zu: %m",
-							XLogFileNameP(sendSeg->tli, sendSeg->num),
-							sendSeg->off, (Size) segbytes)));
-		}
-		else if (readbytes == 0)
-		{
-			ereport(ERROR,
-					(errcode(ERRCODE_DATA_CORRUPTED),
-					 errmsg("could not read from log segment %s, offset %u: read %d of %zu",
-							XLogFileNameP(sendSeg->tli, sendSeg->num),
-							sendSeg->off, readbytes, (Size) segbytes)));
-		}
-
-		/* Update state for read */
-		recptr += readbytes;
-
-		sendSeg->off += readbytes;
-		nbytes -= readbytes;
-		p += readbytes;
-	}
-
-	/*
-	 * After reading into the buffer, check that what we read was valid. We do
-	 * this after reading, because even though the segment was present when we
-	 * opened it, it might get recycled or removed while we read it. The
-	 * read() succeeds in that case, but the data we tried to read might
-	 * already have been overwritten with new WAL records.
-	 */
-	XLByteToSeg(startptr, segno, wal_segment_size);
-	CheckXLogRemoved(segno, ThisTimeLineID);
-
-	/*
-	 * During recovery, the currently-open WAL file might be replaced with the
-	 * file of the same name retrieved from archive. So we always need to
-	 * check what we read was valid after reading into the buffer. If it's
-	 * invalid, we try to open and read the file again.
-	 */
-	if (am_cascading_walsender)
-	{
-		WalSnd	   *walsnd = MyWalSnd;
-		bool		reload;
-
-		SpinLockAcquire(&walsnd->mutex);
-		reload = walsnd->needreload;
-		walsnd->needreload = false;
-		SpinLockRelease(&walsnd->mutex);
-
-		if (reload && sendSeg->file >= 0)
-		{
-			close(sendSeg->file);
-			sendSeg->file = -1;
-
-			goto retry;
-		}
-	}
-}
-
 /*
  * Callback for XLogRead() to open the next segment.
  */
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index 3e09519f88..cd44090d36 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -14,7 +14,6 @@
 
 #include <dirent.h>
 #include <sys/stat.h>
-#include <unistd.h>
 
 #include "access/xlogreader.h"
 #include "access/xlogrecord.h"
@@ -335,128 +334,6 @@ XLogDumpOpenSegment(XLogSegNo nextSegNo, TimeLineID *tli, XLogSegment *seg)
 	seg->tli = *tli;
 }
 
-/*
- * Read count bytes from a segment file in the specified directory, for the
- * given timeline, containing the specified record pointer; store the data in
- * the passed buffer.
- */
-static void
-XLogDumpXLogRead(const char *directory, TimeLineID timeline_id,
-				 XLogRecPtr startptr, char *buf, Size count)
-{
-	char	   *p;
-	XLogRecPtr	recptr;
-	Size		nbytes;
-
-	static int	sendFile = -1;
-	static XLogSegNo sendSegNo = 0;
-	static uint32 sendOff = 0;
-
-	p = buf;
-	recptr = startptr;
-	nbytes = count;
-
-	while (nbytes > 0)
-	{
-		uint32		startoff;
-		int			segbytes;
-		int			readbytes;
-
-		startoff = XLogSegmentOffset(recptr, WalSegSz);
-
-		if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, WalSegSz))
-		{
-			char		fname[MAXFNAMELEN];
-			int			tries;
-
-			/* Switch to another logfile segment */
-			if (sendFile >= 0)
-				close(sendFile);
-
-			XLByteToSeg(recptr, sendSegNo, WalSegSz);
-
-			XLogFileName(fname, timeline_id, sendSegNo, WalSegSz);
-
-			/*
-			 * In follow mode there is a short period of time after the server
-			 * has written the end of the previous file before the new file is
-			 * available. So we loop for 5 seconds looking for the file to
-			 * appear before giving up.
-			 */
-			for (tries = 0; tries < 10; tries++)
-			{
-				sendFile = open_file_in_directory(directory, fname);
-				if (sendFile >= 0)
-					break;
-				if (errno == ENOENT)
-				{
-					int			save_errno = errno;
-
-					/* File not there yet, try again */
-					pg_usleep(500 * 1000);
-
-					errno = save_errno;
-					continue;
-				}
-				/* Any other error, fall through and fail */
-				break;
-			}
-
-			if (sendFile < 0)
-				fatal_error("could not find file \"%s\": %s",
-							fname, strerror(errno));
-			sendOff = 0;
-		}
-
-		/* Need to seek in the file? */
-		if (sendOff != startoff)
-		{
-			if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
-			{
-				int			err = errno;
-				char		fname[MAXPGPATH];
-
-				XLogFileName(fname, timeline_id, sendSegNo, WalSegSz);
-
-				fatal_error("could not seek in log file %s to offset %u: %s",
-							fname, startoff, strerror(err));
-			}
-			sendOff = startoff;
-		}
-
-		/* How many bytes are within this segment? */
-		if (nbytes > (WalSegSz - startoff))
-			segbytes = WalSegSz - startoff;
-		else
-			segbytes = nbytes;
-
-		readbytes = read(sendFile, p, segbytes);
-		if (readbytes <= 0)
-		{
-			int			err = errno;
-			char		fname[MAXPGPATH];
-			int			save_errno = errno;
-
-			XLogFileName(fname, timeline_id, sendSegNo, WalSegSz);
-			errno = save_errno;
-
-			if (readbytes < 0)
-				fatal_error("could not read from log file %s, offset %u, length %d: %s",
-							fname, sendOff, segbytes, strerror(err));
-			else if (readbytes == 0)
-				fatal_error("could not read from log file %s, offset %u: read %d of %zu",
-							fname, sendOff, readbytes, (Size) segbytes);
-		}
-
-		/* Update state for read */
-		recptr += readbytes;
-
-		sendOff += readbytes;
-		nbytes -= readbytes;
-		p += readbytes;
-	}
-}
-
 /*
  * XLogReader read_page callback
  */
-- 
2.22.0

Reply via email to