Alvaro Herrera <alvhe...@2ndquadrant.com> wrote:

> On 2019-Sep-24, Antonin Houska wrote:
> 
> > Alvaro Herrera <alvhe...@2ndquadrant.com> wrote:
> 
> > > If you don't have any strong dislikes for these changes, I'll push this
> > > part and let you rebase the remains on top.
> > 
> > No objections here.
> 
> oK, pushed.  Please rebase the other parts.

Thanks!

> I made one small adjustment: in read_local_xlog_page() there was one
> *readTLI output parameter that was being changed to a local variable
> plus later assigment to the output struct member; I changed the code to
> continue to assign directly to the output variable instead.  There was
> an error case in which the TLI was not assigned to; I suppose this
> doesn't really change things (we don't examine the TLI in that case, do
> we?), but it seemed dangerous to leave like that.

I used the local variable to make some expressions simpler, but missed the
fact that this way I can leave the ws_tli field unassigned if the function
returns prematurely. Now that I look closer, I see that it can be a problem -
in the case of ERROR, XLogReadRecord() does reset the state, but it does not
reset the TLI:

err:
        /*
         * Invalidate the read state. We might read from a different source 
after
         * failure.
         */
        XLogReaderInvalReadState(state);

Thus the TLI appears to be important even on ERROR, and what you've done is
correct. Thanks for fixing that.

One comment on the remaining part of the series:

Before this refactoring, the walsender.c:XLogRead() function contained these
lines

       /*
        * After reading into the buffer, check that what we read was valid. We 
do
        * this after reading, because even though the segment was present when 
we
        * opened it, it might get recycled or removed while we read it. The
        * read() succeeds in that case, but the data we tried to read might
        * already have been overwritten with new WAL records.
        */
       XLByteToSeg(startptr, segno, segcxt->ws_segsize);
       CheckXLogRemoved(segno, ThisTimeLineID);

but they don't fit into the new, generic implementation, so I copied these
lines to the two places right after the call of the new XLogRead(). However I
was not sure if ThisTimeLineID was ever correct here. It seems the original
walsender.c:XLogRead() implementation did not update ThisTimeLineID (and
therefore neither the new callback WalSndSegmentOpen() does), so both
logical_read_xlog_page() and XLogSendPhysical() could read the data from
another (historic) timeline. I think we should check the segment we really
read data from:

        CheckXLogRemoved(segno, sendSeg->ws_tli);

The rebased code is attached.

-- 
Antonin Houska
Web: https://www.cybertec-postgresql.com

>From c8017f48a186be9e0f879573b0c0880e6cc7df44 Mon Sep 17 00:00:00 2001
From: Antonin Houska <a...@cybertec.at>
Date: Thu, 26 Sep 2019 13:51:53 +0200
Subject: [PATCH 1/2] Use only xlogreader.c:XLogRead()

The implementations in xlogutils.c and walsender.c are just renamed now, to be
removed by the following diff.
---
 src/backend/access/transam/xlogreader.c | 153 ++++++++++++++++++++++++
 src/backend/access/transam/xlogutils.c  |  45 ++++++-
 src/backend/replication/walsender.c     | 138 ++++++++++++++++++++-
 src/bin/pg_waldump/pg_waldump.c         |  60 +++++++++-
 src/include/access/xlogreader.h         |  47 ++++++++
 5 files changed, 433 insertions(+), 10 deletions(-)

diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index c8b0d2303d..a58c5d1633 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -17,6 +17,8 @@
  */
 #include "postgres.h"
 
+#include <unistd.h>
+
 #include "access/transam.h"
 #include "access/xlogrecord.h"
 #include "access/xlog_internal.h"
@@ -27,6 +29,7 @@
 
 #ifndef FRONTEND
 #include "miscadmin.h"
+#include "pgstat.h"
 #include "utils/memutils.h"
 #endif
 
@@ -1016,6 +1019,156 @@ out:
 
 #endif							/* FRONTEND */
 
+/*
+ * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'. If
+ * tli_p is passed, get the data from timeline *tli_p. 'pos' is the current
+ * position in the XLOG file and openSegment is a callback that opens the next
+ * segment for reading.
+ *
+ * Returns error information if the data could not be read or NULL if
+ * succeeded.
+ *
+ * XXX probably this should be improved to suck data directly from the
+ * WAL buffers when possible.
+ */
+XLogReadError *
+XLogRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID *tli_p,
+		 WALOpenSegment *seg, WALSegmentContext *segcxt,
+		 WALSegmentOpen openSegment)
+{
+	char	   *p;
+	XLogRecPtr	recptr;
+	Size		nbytes;
+	static XLogReadError errinfo;
+
+	p = buf;
+	recptr = startptr;
+	nbytes = count;
+
+	while (nbytes > 0)
+	{
+		int			segbytes;
+		int			readbytes;
+
+		seg->ws_off = XLogSegmentOffset(recptr, segcxt->ws_segsize);
+
+		if (seg->ws_file < 0 ||
+			!XLByteInSeg(recptr, seg->ws_segno, segcxt->ws_segsize) ||
+			(tli_p != NULL && *tli_p != seg->ws_tli))
+		{
+			XLogSegNo	nextSegNo;
+			TimeLineID	tli;
+			int			file;
+
+			/* Switch to another logfile segment */
+			if (seg->ws_file >= 0)
+				close(seg->ws_file);
+
+			XLByteToSeg(recptr, nextSegNo, segcxt->ws_segsize);
+
+			/*
+			 * If we have the TLI, let's pass it to the callback. If NULL is
+			 * passed, the callback has to find the TLI itself.
+			 */
+			if (tli_p != NULL)
+				tli = *tli_p;
+
+			/* Open the next segment in the caller's way. */
+			openSegment(nextSegNo, &tli, &file, seg, segcxt);
+
+			/* Update the open segment info. */
+			seg->ws_tli = tli;
+			seg->ws_file = file;
+
+			/*
+			 * If the function is called by the XLOG reader, the reader will
+			 * eventually set both "ws_segno" and "ws_off", however the XLOG
+			 * reader is not necessarily involved. Furthermore, we need to set
+			 * the current values for this function to work.
+			 */
+			seg->ws_segno = nextSegNo;
+			seg->ws_off = 0;
+		}
+
+		/* How many bytes are within this segment? */
+		if (nbytes > (segcxt->ws_segsize - seg->ws_off))
+			segbytes = segcxt->ws_segsize - seg->ws_off;
+		else
+			segbytes = nbytes;
+
+#ifndef FRONTEND
+		pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
+#endif
+
+		/*
+		 * Failure to read the data does not necessarily imply non-zero errno.
+		 * Set it to zero so that caller can distinguish the failure that does
+		 * not affect errno.
+		 */
+		errno = 0;
+
+		readbytes = pg_pread(seg->ws_file, p, segbytes, seg->ws_off);
+
+#ifndef FRONTEND
+		pgstat_report_wait_end();
+#endif
+
+		if (readbytes <= 0)
+		{
+			errinfo.read_errno = errno;
+			errinfo.readbytes = readbytes;
+			errinfo.reqbytes = segbytes;
+			errinfo.seg = seg;
+			return &errinfo;
+		}
+
+		/* Update state for read */
+		recptr += readbytes;
+		nbytes -= readbytes;
+		p += readbytes;
+
+		/*
+		 * If the function is called by the XLOG reader, the reader will
+		 * eventually set this field. However we need to care about it too
+		 * because the function can also be used directly (see walsender.c).
+		 */
+		seg->ws_off += readbytes;
+	}
+
+	return NULL;
+}
+
+#ifndef FRONTEND
+/*
+ * Backend-specific convenience code to handle read errors encountered by
+ * XLogRead().
+ */
+void
+XLogReadProcessError(XLogReadError *errinfo)
+{
+	WALOpenSegment *seg = errinfo->seg;
+
+	if (errinfo->readbytes < 0)
+	{
+		errno = errinfo->read_errno;
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not read from log segment %s, offset %u, length %zu: %m",
+						XLogFileNameP(seg->ws_tli, seg->ws_segno),
+						seg->ws_off, (Size) errinfo->reqbytes)));
+	}
+	else
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_DATA_CORRUPTED),
+				 errmsg("could not read from log segment %s, offset %u: length %zu",
+						XLogFileNameP(seg->ws_tli, seg->ws_segno),
+						seg->ws_off,
+						(Size) errinfo->reqbytes)));
+	}
+}
+#endif
+
 /* ----------------------------------------
  * Functions for decoding the data and block references in a record.
  * ----------------------------------------
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 5f1e5ba75d..09d42d3112 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -653,8 +653,8 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
  * frontend).  Probably these should be merged at some point.
  */
 static void
-XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr,
-		 Size count)
+XLogReadOld(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr,
+			Size count)
 {
 	char	   *p;
 	XLogRecPtr	recptr;
@@ -896,6 +896,38 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
 	}
 }
 
+/*
+ * Callback for XLogRead() to open the next segment.
+ */
+static void
+read_local_xlog_page_segment_open(XLogSegNo nextSegNo, TimeLineID *tli_p,
+								  int *file_p, WALOpenSegment *seg,
+								  WALSegmentContext *segcxt)
+{
+	TimeLineID	tli = *tli_p;
+	char		path[MAXPGPATH];
+	int			file;
+
+	XLogFilePath(path, tli, nextSegNo, segcxt->ws_segsize);
+	file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
+
+	if (file < 0)
+	{
+		if (errno == ENOENT)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("requested WAL segment %s has already been removed",
+							path)));
+		else
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not open file \"%s\": %m",
+							path)));
+	}
+
+	*file_p = file;
+}
+
 /*
  * read_page callback for reading local xlog files
  *
@@ -914,6 +946,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	XLogRecPtr	read_upto,
 				loc;
 	int			count;
+	XLogReadError *errinfo;
 
 	loc = targetPagePtr + reqLen;
 
@@ -1020,8 +1053,12 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	 * as 'count', read the whole page anyway. It's guaranteed to be
 	 * zero-padded up to the page boundary if it's incomplete.
 	 */
-	XLogRead(cur_page, state->segcxt.ws_segsize, state->seg.ws_tli, targetPagePtr,
-			 XLOG_BLCKSZ);
+	if ((errinfo = XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ,
+							&state->seg.ws_tli,
+							&state->seg,
+							&state->segcxt,
+							read_local_xlog_page_segment_open)) != NULL)
+		XLogReadProcessError(errinfo);
 
 	/* number of valid bytes in the buffer */
 	return count;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index eb4a98cc91..dcb84693fc 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -248,9 +248,14 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
 static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
-static void XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count);
+static void WalSndSegmentOpen(XLogSegNo nextSegNo, TimeLineID *tli_p,
+							  int *file_p, WALOpenSegment *seg,
+							  WALSegmentContext *segcxt);
 
 
+static void XLogReadOld(WALSegmentContext *segcxt, char *buf,
+						XLogRecPtr startptr, Size count);
+
 /* Initialize walsender process before entering the main command loop */
 void
 InitWalSender(void)
@@ -766,6 +771,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 {
 	XLogRecPtr	flushptr;
 	int			count;
+	XLogReadError *errinfo;
+	XLogSegNo	segno;
 
 	XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
 	sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID);
@@ -786,7 +793,24 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 		count = flushptr - targetPagePtr;	/* part of the page available */
 
 	/* now actually read the data, we know it's there */
-	XLogRead(sendCxt, cur_page, targetPagePtr, XLOG_BLCKSZ);
+	if ((errinfo = XLogRead(cur_page,
+							targetPagePtr,
+							XLOG_BLCKSZ,
+							NULL,	/* WalSndSegmentOpen will determine TLI */
+							sendSeg,
+							sendCxt,
+							WalSndSegmentOpen)) != NULL)
+		XLogReadProcessError(errinfo);
+
+	/*
+	 * After reading into the buffer, check that what we read was valid. We do
+	 * this after reading, because even though the segment was present when we
+	 * opened it, it might get recycled or removed while we read it. The
+	 * read() succeeds in that case, but the data we tried to read might
+	 * already have been overwritten with new WAL records.
+	 */
+	XLByteToSeg(targetPagePtr, segno, sendCxt->ws_segsize);
+	CheckXLogRemoved(segno, sendSeg->ws_tli);
 
 	return count;
 }
@@ -2363,7 +2387,7 @@ WalSndKill(int code, Datum arg)
  * more than one.
  */
 static void
-XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count)
+XLogReadOld(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count)
 {
 	char	   *p;
 	XLogRecPtr	recptr;
@@ -2536,6 +2560,71 @@ retry:
 	}
 }
 
+/*
+ * Callback for XLogRead() to open the next segment.
+ */
+void
+WalSndSegmentOpen(XLogSegNo nextSegNo, TimeLineID *tli_p, int *file_p,
+				  WALOpenSegment *seg, WALSegmentContext *segcxt)
+{
+	char		path[MAXPGPATH];
+
+	/*-------
+	 * When reading from a historic timeline, and there is a timeline switch
+	 * within this segment, read from the WAL segment belonging to the new
+	 * timeline.
+	 *
+	 * For example, imagine that this server is currently on timeline 5, and
+	 * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
+	 * 0/13002088. In pg_wal, we have these files:
+	 *
+	 * ...
+	 * 000000040000000000000012
+	 * 000000040000000000000013
+	 * 000000050000000000000013
+	 * 000000050000000000000014
+	 * ...
+	 *
+	 * In this situation, when requested to send the WAL from segment 0x13, on
+	 * timeline 4, we read the WAL from file 000000050000000000000013. Archive
+	 * recovery prefers files from newer timelines, so if the segment was
+	 * restored from the archive on this server, the file belonging to the old
+	 * timeline, 000000040000000000000013, might not exist. Their contents are
+	 * equal up to the switchpoint, because at a timeline switch, the used
+	 * portion of the old segment is copied to the new file.  -------
+	 */
+	*tli_p = sendTimeLine;
+	if (sendTimeLineIsHistoric)
+	{
+		XLogSegNo	endSegNo;
+
+		XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize);
+		if (seg->ws_segno == endSegNo)
+			*tli_p = sendTimeLineNextTLI;
+	}
+
+	XLogFilePath(path, *tli_p, nextSegNo, segcxt->ws_segsize);
+	*file_p = BasicOpenFile(path, O_RDONLY | PG_BINARY);
+
+	if (*file_p < 0)
+	{
+		/*
+		 * If the file is not found, assume it's because the standby asked for
+		 * a too old WAL segment that has already been removed or recycled.
+		 */
+		if (errno == ENOENT)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("requested WAL segment %s has already been removed",
+							XLogFileNameP(*tli_p, nextSegNo))));
+		else
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not open file \"%s\": %m",
+							path)));
+	}
+}
+
 /*
  * Send out the WAL in its normal physical/stored form.
  *
@@ -2553,6 +2642,8 @@ XLogSendPhysical(void)
 	XLogRecPtr	startptr;
 	XLogRecPtr	endptr;
 	Size		nbytes;
+	XLogSegNo	segno;
+	XLogReadError *errinfo;
 
 	/* If requested switch the WAL sender to the stopping state. */
 	if (got_STOPPING)
@@ -2768,7 +2859,46 @@ XLogSendPhysical(void)
 	 * calls.
 	 */
 	enlargeStringInfo(&output_message, nbytes);
-	XLogRead(sendCxt, &output_message.data[output_message.len], startptr, nbytes);
+
+retry:
+	if ((errinfo = XLogRead(&output_message.data[output_message.len],
+							startptr,
+							nbytes,
+							NULL,	/* WalSndSegmentOpen will determine TLI */
+							sendSeg,
+							sendCxt,
+							WalSndSegmentOpen)) != NULL)
+		XLogReadProcessError(errinfo);
+
+	/* See logical_read_xlog_page(). */
+	XLByteToSeg(startptr, segno, sendCxt->ws_segsize);
+	CheckXLogRemoved(segno, sendSeg->ws_tli);
+
+	/*
+	 * During recovery, the currently-open WAL file might be replaced with the
+	 * file of the same name retrieved from archive. So we always need to
+	 * check what we read was valid after reading into the buffer. If it's
+	 * invalid, we try to open and read the file again.
+	 */
+	if (am_cascading_walsender)
+	{
+		WalSnd	   *walsnd = MyWalSnd;
+		bool		reload;
+
+		SpinLockAcquire(&walsnd->mutex);
+		reload = walsnd->needreload;
+		walsnd->needreload = false;
+		SpinLockRelease(&walsnd->mutex);
+
+		if (reload && sendSeg->ws_file >= 0)
+		{
+			close(sendSeg->ws_file);
+			sendSeg->ws_file = -1;
+
+			goto retry;
+		}
+	}
+
 	output_message.len += nbytes;
 	output_message.data[output_message.len] = '\0';
 
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index b79208cd73..c97376101c 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -281,6 +281,46 @@ identify_target_directory(char *directory, char *fname)
 	return NULL;				/* not reached */
 }
 
+static void
+WALDumpOpenSegment(XLogSegNo nextSegNo, TimeLineID *tli_p, int *file_p,
+				   WALOpenSegment *seg, WALSegmentContext *segcxt)
+{
+	TimeLineID	tli = *tli_p;
+	char		fname[MAXPGPATH];
+	int			tries;
+
+	XLogFileName(fname, tli, nextSegNo, segcxt->ws_segsize);
+
+	/*
+	 * In follow mode there is a short period of time after the server has
+	 * written the end of the previous file before the new file is available.
+	 * So we loop for 5 seconds looking for the file to appear before giving
+	 * up.
+	 */
+	for (tries = 0; tries < 10; tries++)
+	{
+		*file_p = open_file_in_directory(segcxt->ws_dir, fname);
+		if (*file_p >= 0)
+			break;
+		if (errno == ENOENT)
+		{
+			int			save_errno = errno;
+
+			/* File not there yet, try again */
+			pg_usleep(500 * 1000);
+
+			errno = save_errno;
+			continue;
+		}
+		/* Any other error, fall through and fail */
+		break;
+	}
+
+	if (*file_p < 0)
+		fatal_error("could not find file \"%s\": %s",
+					fname, strerror(errno));
+}
+
 /*
  * Read count bytes from a segment file in the specified directory, for the
  * given timeline, containing the specified record pointer; store the data in
@@ -412,6 +452,7 @@ XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
 {
 	XLogDumpPrivate *private = state->private_data;
 	int			count = XLOG_BLCKSZ;
+	XLogReadError *errinfo;
 
 	if (private->endptr != InvalidXLogRecPtr)
 	{
@@ -426,8 +467,23 @@ XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
 		}
 	}
 
-	XLogDumpXLogRead(state->segcxt.ws_dir, private->timeline, targetPagePtr,
-					 readBuff, count);
+	if ((errinfo = XLogRead(readBuff, targetPagePtr, count, &private->timeline,
+							&state->seg, &state->segcxt, WALDumpOpenSegment)) != NULL)
+	{
+		WALOpenSegment *seg = errinfo->seg;
+		char		fname[MAXPGPATH];
+
+		XLogFileName(fname, seg->ws_tli, seg->ws_segno,
+					 state->segcxt.ws_segsize);
+
+		if (errno != 0)
+			fatal_error("could not read from log file %s, offset %u, length %zu: %s",
+						fname, seg->ws_off, (Size) errinfo->reqbytes,
+						strerror(errinfo->read_errno));
+		else
+			fatal_error("could not read from log file %s, offset %u: length: %zu",
+						fname, seg->ws_off, (Size) errinfo->reqbytes);
+	}
 
 	return count;
 }
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 1bbee386e8..bf4d105de3 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -218,6 +218,31 @@ extern XLogReaderState *XLogReaderAllocate(int wal_segment_size,
 extern void XLogReaderFree(XLogReaderState *state);
 
 /* Initialize supporting structures */
+/*
+ * Callback to open the specified WAL segment for reading.
+ *
+ * "nextSegNo" is the number of the segment to be opened.
+ *
+ * "tli_p" is an input/output argument. If *tli_p is valid, it's the timeline
+ * the new segment should be in. If *tli_p==InvalidTimeLineID, the callback
+ * needs to determine the timeline itself and put the result into *tli_p.
+ *
+ * "file_p" points to an address the segment file descriptor should be stored
+ * at.
+ *
+ * "seg" provides information on the currently open segment. The callback is
+ * not supposed to change this info.
+ *
+ * "segcxt" is additional information about the segment, which logically does
+ * not fit into "seg".
+ *
+ * BasicOpenFile() is the preferred way to open the segment file in backend
+ * code, whereas open(2) should be used in frontend.
+ */
+typedef void (*WALSegmentOpen) (XLogSegNo nextSegNo, TimeLineID *tli_p,
+								int *file_p, WALOpenSegment *seg,
+								WALSegmentContext *segcxt);
+
 extern void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
 							   int segsize, const char *waldir);
 
@@ -232,6 +257,28 @@ extern bool XLogReaderValidatePageHeader(XLogReaderState *state,
 #ifdef FRONTEND
 extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr);
 #endif							/* FRONTEND */
+/*
+ * Error information that both backend and frontend caller can process.
+ *
+ * XXX Should the name be WALReadError? If so, we probably need to rename
+ * XLogRead() and XLogReadProcessError() too.
+ */
+typedef struct XLogReadError
+{
+	int			read_errno;		/* errno set by the last read(). */
+	int			readbytes;		/* Bytes read by the last read(). */
+	int			reqbytes;		/* Bytes requested to be read. */
+	WALOpenSegment *seg;		/* Segment we tried to read from. */
+} XLogReadError;
+
+extern XLogReadError *XLogRead(char *buf, XLogRecPtr startptr,
+							   Size count, TimeLineID *tli_p,
+							   WALOpenSegment *seg, WALSegmentContext *segcxt,
+							   WALSegmentOpen openSegment);
+#ifndef FRONTEND
+void		XLogReadProcessError(XLogReadError *errinfo);
+#endif
+
 /* Functions for decoding an XLogRecord */
 
 extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record,
-- 
2.20.1

>From cfc22166e79c9c68108e06f00a6c4125870d1c3a Mon Sep 17 00:00:00 2001
From: Antonin Houska <a...@cybertec.at>
Date: Thu, 26 Sep 2019 13:51:53 +0200
Subject: [PATCH 2/2] Remove the old implemenations of XLogRead().

Done in a separate patch because the diff looks harder to read if one function
(XLogRead) is removed and another one (the WALSegmentOpen callback) is added
nearby at the same time (the addition and removal of code can get mixed in the
diff).
---
 src/backend/access/transam/xlogutils.c | 122 ----------------
 src/backend/replication/walsender.c    | 188 -------------------------
 src/bin/pg_waldump/pg_waldump.c        | 122 ----------------
 3 files changed, 432 deletions(-)

diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 09d42d3112..002b8f72a9 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -639,128 +639,6 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
 	forget_invalid_pages(rnode, forkNum, nblocks);
 }
 
-/*
- * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
- * in timeline 'tli'.
- *
- * Will open, and keep open, one WAL segment stored in the static file
- * descriptor 'sendFile'. This means if XLogRead is used once, there will
- * always be one descriptor left open until the process ends, but never
- * more than one.
- *
- * XXX This is very similar to pg_waldump's XLogDumpXLogRead and to XLogRead
- * in walsender.c but for small differences (such as lack of elog() in
- * frontend).  Probably these should be merged at some point.
- */
-static void
-XLogReadOld(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr,
-			Size count)
-{
-	char	   *p;
-	XLogRecPtr	recptr;
-	Size		nbytes;
-
-	/* state maintained across calls */
-	static int	sendFile = -1;
-	static XLogSegNo sendSegNo = 0;
-	static TimeLineID sendTLI = 0;
-	static uint32 sendOff = 0;
-
-	Assert(segsize == wal_segment_size);
-
-	p = buf;
-	recptr = startptr;
-	nbytes = count;
-
-	while (nbytes > 0)
-	{
-		uint32		startoff;
-		int			segbytes;
-		int			readbytes;
-
-		startoff = XLogSegmentOffset(recptr, segsize);
-
-		/* Do we need to switch to a different xlog segment? */
-		if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, segsize) ||
-			sendTLI != tli)
-		{
-			char		path[MAXPGPATH];
-
-			if (sendFile >= 0)
-				close(sendFile);
-
-			XLByteToSeg(recptr, sendSegNo, segsize);
-
-			XLogFilePath(path, tli, sendSegNo, segsize);
-
-			sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY);
-
-			if (sendFile < 0)
-			{
-				if (errno == ENOENT)
-					ereport(ERROR,
-							(errcode_for_file_access(),
-							 errmsg("requested WAL segment %s has already been removed",
-									path)));
-				else
-					ereport(ERROR,
-							(errcode_for_file_access(),
-							 errmsg("could not open file \"%s\": %m",
-									path)));
-			}
-			sendOff = 0;
-			sendTLI = tli;
-		}
-
-		/* Need to seek in the file? */
-		if (sendOff != startoff)
-		{
-			if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
-			{
-				char		path[MAXPGPATH];
-				int			save_errno = errno;
-
-				XLogFilePath(path, tli, sendSegNo, segsize);
-				errno = save_errno;
-				ereport(ERROR,
-						(errcode_for_file_access(),
-						 errmsg("could not seek in log segment %s to offset %u: %m",
-								path, startoff)));
-			}
-			sendOff = startoff;
-		}
-
-		/* How many bytes are within this segment? */
-		if (nbytes > (segsize - startoff))
-			segbytes = segsize - startoff;
-		else
-			segbytes = nbytes;
-
-		pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
-		readbytes = read(sendFile, p, segbytes);
-		pgstat_report_wait_end();
-		if (readbytes <= 0)
-		{
-			char		path[MAXPGPATH];
-			int			save_errno = errno;
-
-			XLogFilePath(path, tli, sendSegNo, segsize);
-			errno = save_errno;
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not read from log segment %s, offset %u, length %lu: %m",
-							path, sendOff, (unsigned long) segbytes)));
-		}
-
-		/* Update state for read */
-		recptr += readbytes;
-
-		sendOff += readbytes;
-		nbytes -= readbytes;
-		p += readbytes;
-	}
-}
-
 /*
  * Determine which timeline to read an xlog page from and set the
  * XLogReaderState's currTLI to that timeline ID.
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index dcb84693fc..d7df72fb87 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -253,9 +253,6 @@ static void WalSndSegmentOpen(XLogSegNo nextSegNo, TimeLineID *tli_p,
 							  WALSegmentContext *segcxt);
 
 
-static void XLogReadOld(WALSegmentContext *segcxt, char *buf,
-						XLogRecPtr startptr, Size count);
-
 /* Initialize walsender process before entering the main command loop */
 void
 InitWalSender(void)
@@ -2375,191 +2372,6 @@ WalSndKill(int code, Datum arg)
 	SpinLockRelease(&walsnd->mutex);
 }
 
-/*
- * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
- *
- * XXX probably this should be improved to suck data directly from the
- * WAL buffers when possible.
- *
- * Will open, and keep open, one WAL segment stored in the global file
- * descriptor sendFile. This means if XLogRead is used once, there will
- * always be one descriptor left open until the process ends, but never
- * more than one.
- */
-static void
-XLogReadOld(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count)
-{
-	char	   *p;
-	XLogRecPtr	recptr;
-	Size		nbytes;
-	XLogSegNo	segno;
-
-retry:
-	p = buf;
-	recptr = startptr;
-	nbytes = count;
-
-	while (nbytes > 0)
-	{
-		uint32		startoff;
-		int			segbytes;
-		int			readbytes;
-
-		startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize);
-
-		if (sendSeg->ws_file < 0 ||
-			!XLByteInSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize))
-		{
-			char		path[MAXPGPATH];
-
-			/* Switch to another logfile segment */
-			if (sendSeg->ws_file >= 0)
-				close(sendSeg->ws_file);
-
-			XLByteToSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize);
-
-			/*-------
-			 * When reading from a historic timeline, and there is a timeline
-			 * switch within this segment, read from the WAL segment belonging
-			 * to the new timeline.
-			 *
-			 * For example, imagine that this server is currently on timeline
-			 * 5, and we're streaming timeline 4. The switch from timeline 4
-			 * to 5 happened at 0/13002088. In pg_wal, we have these files:
-			 *
-			 * ...
-			 * 000000040000000000000012
-			 * 000000040000000000000013
-			 * 000000050000000000000013
-			 * 000000050000000000000014
-			 * ...
-			 *
-			 * In this situation, when requested to send the WAL from
-			 * segment 0x13, on timeline 4, we read the WAL from file
-			 * 000000050000000000000013. Archive recovery prefers files from
-			 * newer timelines, so if the segment was restored from the
-			 * archive on this server, the file belonging to the old timeline,
-			 * 000000040000000000000013, might not exist. Their contents are
-			 * equal up to the switchpoint, because at a timeline switch, the
-			 * used portion of the old segment is copied to the new file.
-			 *-------
-			 */
-			sendSeg->ws_tli = sendTimeLine;
-			if (sendTimeLineIsHistoric)
-			{
-				XLogSegNo	endSegNo;
-
-				XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize);
-				if (sendSeg->ws_segno == endSegNo)
-					sendSeg->ws_tli = sendTimeLineNextTLI;
-			}
-
-			XLogFilePath(path, sendSeg->ws_tli, sendSeg->ws_segno, segcxt->ws_segsize);
-
-			sendSeg->ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
-			if (sendSeg->ws_file < 0)
-			{
-				/*
-				 * If the file is not found, assume it's because the standby
-				 * asked for a too old WAL segment that has already been
-				 * removed or recycled.
-				 */
-				if (errno == ENOENT)
-					ereport(ERROR,
-							(errcode_for_file_access(),
-							 errmsg("requested WAL segment %s has already been removed",
-									XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno))));
-				else
-					ereport(ERROR,
-							(errcode_for_file_access(),
-							 errmsg("could not open file \"%s\": %m",
-									path)));
-			}
-			sendSeg->ws_off = 0;
-		}
-
-		/* Need to seek in the file? */
-		if (sendSeg->ws_off != startoff)
-		{
-			if (lseek(sendSeg->ws_file, (off_t) startoff, SEEK_SET) < 0)
-				ereport(ERROR,
-						(errcode_for_file_access(),
-						 errmsg("could not seek in log segment %s to offset %u: %m",
-								XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
-								startoff)));
-			sendSeg->ws_off = startoff;
-		}
-
-		/* How many bytes are within this segment? */
-		if (nbytes > (segcxt->ws_segsize - startoff))
-			segbytes = segcxt->ws_segsize - startoff;
-		else
-			segbytes = nbytes;
-
-		pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
-		readbytes = read(sendSeg->ws_file, p, segbytes);
-		pgstat_report_wait_end();
-		if (readbytes < 0)
-		{
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not read from log segment %s, offset %u, length %zu: %m",
-							XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
-							sendSeg->ws_off, (Size) segbytes)));
-		}
-		else if (readbytes == 0)
-		{
-			ereport(ERROR,
-					(errcode(ERRCODE_DATA_CORRUPTED),
-					 errmsg("could not read from log segment %s, offset %u: read %d of %zu",
-							XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
-							sendSeg->ws_off, readbytes, (Size) segbytes)));
-		}
-
-		/* Update state for read */
-		recptr += readbytes;
-
-		sendSeg->ws_off += readbytes;
-		nbytes -= readbytes;
-		p += readbytes;
-	}
-
-	/*
-	 * After reading into the buffer, check that what we read was valid. We do
-	 * this after reading, because even though the segment was present when we
-	 * opened it, it might get recycled or removed while we read it. The
-	 * read() succeeds in that case, but the data we tried to read might
-	 * already have been overwritten with new WAL records.
-	 */
-	XLByteToSeg(startptr, segno, segcxt->ws_segsize);
-	CheckXLogRemoved(segno, ThisTimeLineID);
-
-	/*
-	 * During recovery, the currently-open WAL file might be replaced with the
-	 * file of the same name retrieved from archive. So we always need to
-	 * check what we read was valid after reading into the buffer. If it's
-	 * invalid, we try to open and read the file again.
-	 */
-	if (am_cascading_walsender)
-	{
-		WalSnd	   *walsnd = MyWalSnd;
-		bool		reload;
-
-		SpinLockAcquire(&walsnd->mutex);
-		reload = walsnd->needreload;
-		walsnd->needreload = false;
-		SpinLockRelease(&walsnd->mutex);
-
-		if (reload && sendSeg->ws_file >= 0)
-		{
-			close(sendSeg->ws_file);
-			sendSeg->ws_file = -1;
-
-			goto retry;
-		}
-	}
-}
-
 /*
  * Callback for XLogRead() to open the next segment.
  */
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index c97376101c..57dd7df56f 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -321,128 +321,6 @@ WALDumpOpenSegment(XLogSegNo nextSegNo, TimeLineID *tli_p, int *file_p,
 					fname, strerror(errno));
 }
 
-/*
- * Read count bytes from a segment file in the specified directory, for the
- * given timeline, containing the specified record pointer; store the data in
- * the passed buffer.
- */
-static void
-XLogDumpXLogRead(const char *directory, TimeLineID timeline_id,
-				 XLogRecPtr startptr, char *buf, Size count)
-{
-	char	   *p;
-	XLogRecPtr	recptr;
-	Size		nbytes;
-
-	static int	sendFile = -1;
-	static XLogSegNo sendSegNo = 0;
-	static uint32 sendOff = 0;
-
-	p = buf;
-	recptr = startptr;
-	nbytes = count;
-
-	while (nbytes > 0)
-	{
-		uint32		startoff;
-		int			segbytes;
-		int			readbytes;
-
-		startoff = XLogSegmentOffset(recptr, WalSegSz);
-
-		if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, WalSegSz))
-		{
-			char		fname[MAXFNAMELEN];
-			int			tries;
-
-			/* Switch to another logfile segment */
-			if (sendFile >= 0)
-				close(sendFile);
-
-			XLByteToSeg(recptr, sendSegNo, WalSegSz);
-
-			XLogFileName(fname, timeline_id, sendSegNo, WalSegSz);
-
-			/*
-			 * In follow mode there is a short period of time after the server
-			 * has written the end of the previous file before the new file is
-			 * available. So we loop for 5 seconds looking for the file to
-			 * appear before giving up.
-			 */
-			for (tries = 0; tries < 10; tries++)
-			{
-				sendFile = open_file_in_directory(directory, fname);
-				if (sendFile >= 0)
-					break;
-				if (errno == ENOENT)
-				{
-					int			save_errno = errno;
-
-					/* File not there yet, try again */
-					pg_usleep(500 * 1000);
-
-					errno = save_errno;
-					continue;
-				}
-				/* Any other error, fall through and fail */
-				break;
-			}
-
-			if (sendFile < 0)
-				fatal_error("could not find file \"%s\": %s",
-							fname, strerror(errno));
-			sendOff = 0;
-		}
-
-		/* Need to seek in the file? */
-		if (sendOff != startoff)
-		{
-			if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
-			{
-				int			err = errno;
-				char		fname[MAXPGPATH];
-
-				XLogFileName(fname, timeline_id, sendSegNo, WalSegSz);
-
-				fatal_error("could not seek in log file %s to offset %u: %s",
-							fname, startoff, strerror(err));
-			}
-			sendOff = startoff;
-		}
-
-		/* How many bytes are within this segment? */
-		if (nbytes > (WalSegSz - startoff))
-			segbytes = WalSegSz - startoff;
-		else
-			segbytes = nbytes;
-
-		readbytes = read(sendFile, p, segbytes);
-		if (readbytes <= 0)
-		{
-			int			err = errno;
-			char		fname[MAXPGPATH];
-			int			save_errno = errno;
-
-			XLogFileName(fname, timeline_id, sendSegNo, WalSegSz);
-			errno = save_errno;
-
-			if (readbytes < 0)
-				fatal_error("could not read from log file %s, offset %u, length %d: %s",
-							fname, sendOff, segbytes, strerror(err));
-			else if (readbytes == 0)
-				fatal_error("could not read from log file %s, offset %u: read %d of %zu",
-							fname, sendOff, readbytes, (Size) segbytes);
-		}
-
-		/* Update state for read */
-		recptr += readbytes;
-
-		sendOff += readbytes;
-		nbytes -= readbytes;
-		p += readbytes;
-	}
-}
-
 /*
  * XLogReader read_page callback
  */
-- 
2.20.1

Reply via email to