On 17.09.2012 11:12, Andres Freund wrote:
On Monday, September 17, 2012 09:40:17 AM Heikki Linnakangas wrote:
On 15.09.2012 03:39, Andres Freund wrote:
2. We should focus on reading WAL, I don't see the point of mixing WAL
writing into this.
If you write something that filters/analyzes and then forwards WAL and you want
to do that without a big overhead (i.e. completely reassembling everything, and
then deassembling it again for writeout) its hard to do that without
integrating both sides.

It seems really complicated to filter/analyze WAL records without reassembling them, anyway. The user of the facility is in charge of reading the physical data, so you can still access the raw data, for forwarding purposes, in addition to the reassembled records.

Or what exactly do you mean by "completely deassembling"? I read that to mean dealing with page boundaries, ie. if a record is split across pages, copy parts into a contiguous temporary buffer.

Also, I want to read records incrementally/partially just as data comes in
which again is hard to combine with writing out the data again.

You mean, you want to start reading the first half of a record, before the 2nd half is available? That seems complicated. I'd suggest keeping it simple for now, and optimize later if necessary. Note that before you have the whole WAL record, you cannot CRC check it, so you don't know if it's in fact a valid WAL record.

I came up with the attached. I moved ReadRecord and some supporting
functions from xlog.c to xlogreader.c, and made it operate on
XLogReaderState instead of global global variables. As discussed before,
I didn't like the callback-style API, I think the consumer of the API
should rather just call ReadRecord repeatedly to get each record. So
that's what I did.
The problem with that is that kind of API is that it, at least as far as I can
see, that it never can operate on incomplete/partial input. Your need to buffer
larger amounts of xlog somewhere and you need to be aware of record boundaries.
Both are things I dislike in a more generic user than xlog.c.

I don't understand that argument. A typical large WAL record is split across 1-2 pages, maybe 3-4 at most, for an index page split record. That doesn't feel like much to me. In extreme cases, a WAL record can be much larger (e.g a commit record of a transaction with a huge number of subtransactions), but that should be rare in practice.

The user of the facility doesn't need to be aware of record boundaries, that's the responsibility of the facility. I thought that's exactly the point of generalizing this thing, to make it unnecessary for the code that uses it to be aware of such things.

If you don't want the capability to forward/filter the data and read partial
data without regard for record constraints/buffering your patch seems to be
quite a good start. It misses xlogreader.h though...

Ah sorry, patch with xlogreader.h attached.

- Heikki
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index f82f10e..660b5fc 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -13,7 +13,7 @@ top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = clog.o transam.o varsup.o xact.o rmgr.o slru.o subtrans.o multixact.o \
-	twophase.o twophase_rmgr.o xlog.o xlogfuncs.o xlogutils.o
+	twophase.o twophase_rmgr.o xlog.o xlogfuncs.o xlogreader.o xlogutils.o
 
 include $(top_srcdir)/src/backend/common.mk
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index ff56c26..769ddea 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -31,6 +31,7 @@
 #include "access/twophase.h"
 #include "access/xact.h"
 #include "access/xlog_internal.h"
+#include "access/xlogreader.h"
 #include "access/xlogutils.h"
 #include "catalog/catversion.h"
 #include "catalog/pg_control.h"
@@ -541,6 +542,8 @@ static uint32 readOff = 0;
 static uint32 readLen = 0;
 static int	readSource = 0;		/* XLOG_FROM_* code */
 
+static bool fetching_ckpt_global;
+
 /*
  * Keeps track of which sources we've tried to read the current WAL
  * record from and failed.
@@ -556,13 +559,6 @@ static int	failedSources = 0;	/* OR of XLOG_FROM_* codes */
 static TimestampTz XLogReceiptTime = 0;
 static int	XLogReceiptSource = 0;		/* XLOG_FROM_* code */
 
-/* Buffer for currently read page (XLOG_BLCKSZ bytes) */
-static char *readBuf = NULL;
-
-/* Buffer for current ReadRecord result (expandable) */
-static char *readRecordBuf = NULL;
-static uint32 readRecordBufSize = 0;
-
 /* State information for XLOG reading */
 static XLogRecPtr ReadRecPtr;	/* start of last record read */
 static XLogRecPtr EndRecPtr;	/* end+1 of last record read */
@@ -632,9 +628,8 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
 static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
 			 int source, bool notexistOk);
 static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources);
-static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
-			 bool randAccess);
-static int	emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
+static bool XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr RecPtr,
+			 int emode, bool randAccess, char *reaBuf, void *private_data);
 static void XLogFileClose(void);
 static bool RestoreArchivedFile(char *path, const char *xlogfname,
 					const char *recovername, off_t expectedSize);
@@ -646,12 +641,10 @@ static void UpdateLastRemovedPtr(char *filename);
 static void ValidateXLOGDirectoryStructure(void);
 static void CleanupBackupHistory(void);
 static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force);
-static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt);
-static void CheckRecoveryConsistency(void);
+static XLogRecord *ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode, bool fetching_ckpt);
+static void CheckRecoveryConsistency(XLogRecPtr EndRecPtr);
 static bool ValidXLogPageHeader(XLogPageHeader hdr, int emode);
-static bool ValidXLogRecordHeader(XLogRecPtr *RecPtr, XLogRecord *record,
-					  int emode, bool randAccess);
-static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt);
+static XLogRecord *ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int whichChkpt);
 static List *readTimeLineHistory(TimeLineID targetTLI);
 static bool existsTimeLineHistory(TimeLineID probeTLI);
 static bool rescanLatestTimeLine(void);
@@ -3703,102 +3696,6 @@ RestoreBkpBlocks(XLogRecPtr lsn, XLogRecord *record, bool cleanup)
 }
 
 /*
- * CRC-check an XLOG record.  We do not believe the contents of an XLOG
- * record (other than to the minimal extent of computing the amount of
- * data to read in) until we've checked the CRCs.
- *
- * We assume all of the record (that is, xl_tot_len bytes) has been read
- * into memory at *record.  Also, ValidXLogRecordHeader() has accepted the
- * record's header, which means in particular that xl_tot_len is at least
- * SizeOfXlogRecord, so it is safe to fetch xl_len.
- */
-static bool
-RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
-{
-	pg_crc32	crc;
-	int			i;
-	uint32		len = record->xl_len;
-	BkpBlock	bkpb;
-	char	   *blk;
-	size_t		remaining = record->xl_tot_len;
-
-	/* First the rmgr data */
-	if (remaining < SizeOfXLogRecord + len)
-	{
-		/* ValidXLogRecordHeader() should've caught this already... */
-		ereport(emode_for_corrupt_record(emode, recptr),
-				(errmsg("invalid record length at %X/%X",
-						(uint32) (recptr >> 32), (uint32) recptr)));
-		return false;
-	}
-	remaining -= SizeOfXLogRecord + len;
-	INIT_CRC32(crc);
-	COMP_CRC32(crc, XLogRecGetData(record), len);
-
-	/* Add in the backup blocks, if any */
-	blk = (char *) XLogRecGetData(record) + len;
-	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
-	{
-		uint32		blen;
-
-		if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
-			continue;
-
-		if (remaining < sizeof(BkpBlock))
-		{
-			ereport(emode_for_corrupt_record(emode, recptr),
-					(errmsg("invalid backup block size in record at %X/%X",
-							(uint32) (recptr >> 32), (uint32) recptr)));
-			return false;
-		}
-		memcpy(&bkpb, blk, sizeof(BkpBlock));
-
-		if (bkpb.hole_offset + bkpb.hole_length > BLCKSZ)
-		{
-			ereport(emode_for_corrupt_record(emode, recptr),
-					(errmsg("incorrect hole size in record at %X/%X",
-							(uint32) (recptr >> 32), (uint32) recptr)));
-			return false;
-		}
-		blen = sizeof(BkpBlock) + BLCKSZ - bkpb.hole_length;
-
-		if (remaining < blen)
-		{
-			ereport(emode_for_corrupt_record(emode, recptr),
-					(errmsg("invalid backup block size in record at %X/%X",
-							(uint32) (recptr >> 32), (uint32) recptr)));
-			return false;
-		}
-		remaining -= blen;
-		COMP_CRC32(crc, blk, blen);
-		blk += blen;
-	}
-
-	/* Check that xl_tot_len agrees with our calculation */
-	if (remaining != 0)
-	{
-		ereport(emode_for_corrupt_record(emode, recptr),
-				(errmsg("incorrect total length in record at %X/%X",
-						(uint32) (recptr >> 32), (uint32) recptr)));
-		return false;
-	}
-
-	/* Finally include the record header */
-	COMP_CRC32(crc, (char *) record, offsetof(XLogRecord, xl_crc));
-	FIN_CRC32(crc);
-
-	if (!EQ_CRC32(record->xl_crc, crc))
-	{
-		ereport(emode_for_corrupt_record(emode, recptr),
-		(errmsg("incorrect resource manager data checksum in record at %X/%X",
-				(uint32) (recptr >> 32), (uint32) recptr)));
-		return false;
-	}
-
-	return true;
-}
-
-/*
  * Attempt to read an XLOG record.
  *
  * If RecPtr is not NULL, try to read a record at that position.  Otherwise
@@ -3811,290 +3708,35 @@ RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
  * the returned record pointer always points there.
  */
 static XLogRecord *
-ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt)
+ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode, bool fetching_ckpt)
 {
 	XLogRecord *record;
-	XLogRecPtr	tmpRecPtr = EndRecPtr;
-	bool		randAccess = false;
-	uint32		len,
-				total_len;
-	uint32		targetRecOff;
-	uint32		pageHeaderSize;
-	bool		gotheader;
-
-	if (readBuf == NULL)
-	{
-		/*
-		 * First time through, permanently allocate readBuf.  We do it this
-		 * way, rather than just making a static array, for two reasons: (1)
-		 * no need to waste the storage in most instantiations of the backend;
-		 * (2) a static char array isn't guaranteed to have any particular
-		 * alignment, whereas malloc() will provide MAXALIGN'd storage.
-		 */
-		readBuf = (char *) malloc(XLOG_BLCKSZ);
-		Assert(readBuf != NULL);
-	}
-
-	if (RecPtr == NULL)
-	{
-		RecPtr = &tmpRecPtr;
 
-		/*
-		 * RecPtr is pointing to end+1 of the previous WAL record.  If
-		 * we're at a page boundary, no more records can fit on the current
-		 * page. We must skip over the page header, but we can't do that
-		 * until we've read in the page, since the header size is variable.
-		 */
-	}
-	else
-	{
-		/*
-		 * In this case, the passed-in record pointer should already be
-		 * pointing to a valid record starting position.
-		 */
-		if (!XRecOffIsValid(*RecPtr))
-			ereport(PANIC,
-					(errmsg("invalid record offset at %X/%X",
-							(uint32) (*RecPtr >> 32), (uint32) *RecPtr)));
-
-		/*
-		 * Since we are going to a random position in WAL, forget any prior
-		 * state about what timeline we were in, and allow it to be any
-		 * timeline in expectedTLIs.  We also set a flag to allow curFileTLI
-		 * to go backwards (but we can't reset that variable right here, since
-		 * we might not change files at all).
-		 */
+	if (!XLogRecPtrIsInvalid(RecPtr))
 		lastPageTLI = 0;		/* see comment in ValidXLogPageHeader */
-		randAccess = true;		/* allow curFileTLI to go backwards too */
-	}
+
+	fetching_ckpt_global = fetching_ckpt;
 
 	/* This is the first try to read this page. */
 	failedSources = 0;
-retry:
-	/* Read the page containing the record */
-	if (!XLogPageRead(RecPtr, emode, fetching_ckpt, randAccess))
-		return NULL;
-
-	pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
-	targetRecOff = (*RecPtr) % XLOG_BLCKSZ;
-	if (targetRecOff == 0)
+	do
 	{
-		/*
-		 * At page start, so skip over page header.  The Assert checks that
-		 * we're not scribbling on caller's record pointer; it's OK because we
-		 * can only get here in the continuing-from-prev-record case, since
-		 * XRecOffIsValid rejected the zero-page-offset case otherwise.
-		 */
-		Assert(RecPtr == &tmpRecPtr);
-		(*RecPtr) += pageHeaderSize;
-		targetRecOff = pageHeaderSize;
-	}
-	else if (targetRecOff < pageHeaderSize)
-	{
-		ereport(emode_for_corrupt_record(emode, *RecPtr),
-				(errmsg("invalid record offset at %X/%X",
-						(uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-		goto next_record_is_invalid;
-	}
-	if ((((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
-		targetRecOff == pageHeaderSize)
-	{
-		ereport(emode_for_corrupt_record(emode, *RecPtr),
-				(errmsg("contrecord is requested by %X/%X",
-						(uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-		goto next_record_is_invalid;
-	}
-
-	/*
-	 * Read the record length.
-	 *
-	 * NB: Even though we use an XLogRecord pointer here, the whole record
-	 * header might not fit on this page. xl_tot_len is the first field of
-	 * the struct, so it must be on this page (the records are MAXALIGNed),
-	 * but we cannot access any other fields until we've verified that we
-	 * got the whole header.
-	 */
-	record = (XLogRecord *) (readBuf + (*RecPtr) % XLOG_BLCKSZ);
-	total_len = record->xl_tot_len;
-
-	/*
-	 * If the whole record header is on this page, validate it immediately.
-	 * Otherwise do just a basic sanity check on xl_tot_len, and validate the
-	 * rest of the header after reading it from the next page.  The xl_tot_len
-	 * check is necessary here to ensure that we enter the "Need to reassemble
-	 * record" code path below; otherwise we might fail to apply
-	 * ValidXLogRecordHeader at all.
-	 */
-	if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
-	{
-		if (!ValidXLogRecordHeader(RecPtr, record, emode, randAccess))
-			goto next_record_is_invalid;
-		gotheader = true;
-	}
-	else
-	{
-		if (total_len < SizeOfXLogRecord)
+		record = XLogReadRecord(xlogreader, RecPtr, emode);
+		ReadRecPtr = xlogreader->ReadRecPtr;
+		EndRecPtr = xlogreader->EndRecPtr;
+		if (record == NULL)
 		{
-			ereport(emode_for_corrupt_record(emode, *RecPtr),
-					(errmsg("invalid record length at %X/%X",
-							(uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-			goto next_record_is_invalid;
-		}
-		gotheader = false;
-	}
-
-	/*
-	 * Allocate or enlarge readRecordBuf as needed.  To avoid useless small
-	 * increases, round its size to a multiple of XLOG_BLCKSZ, and make sure
-	 * it's at least 4*Max(BLCKSZ, XLOG_BLCKSZ) to start with.  (That is
-	 * enough for all "normal" records, but very large commit or abort records
-	 * might need more space.)
-	 */
-	if (total_len > readRecordBufSize)
-	{
-		uint32		newSize = total_len;
+			failedSources |= readSource;
 
-		newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
-		newSize = Max(newSize, 4 * Max(BLCKSZ, XLOG_BLCKSZ));
-		if (readRecordBuf)
-			free(readRecordBuf);
-		readRecordBuf = (char *) malloc(newSize);
-		if (!readRecordBuf)
-		{
-			readRecordBufSize = 0;
-			/* We treat this as a "bogus data" condition */
-			ereport(emode_for_corrupt_record(emode, *RecPtr),
-					(errmsg("record length %u at %X/%X too long",
-							total_len, (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-			goto next_record_is_invalid;
-		}
-		readRecordBufSize = newSize;
-	}
-
-	len = XLOG_BLCKSZ - (*RecPtr) % XLOG_BLCKSZ;
-	if (total_len > len)
-	{
-		/* Need to reassemble record */
-		char	   *contrecord;
-		XLogPageHeader pageHeader;
-		XLogRecPtr	pagelsn;
-		char	   *buffer;
-		uint32		gotlen;
-
-		/* Initialize pagelsn to the beginning of the page this record is on */
-		pagelsn = ((*RecPtr) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
-
-		/* Copy the first fragment of the record from the first page. */
-		memcpy(readRecordBuf, readBuf + (*RecPtr) % XLOG_BLCKSZ, len);
-		buffer = readRecordBuf + len;
-		gotlen = len;
-
-		do
-		{
-			/* Calculate pointer to beginning of next page */
-			XLByteAdvance(pagelsn, XLOG_BLCKSZ);
-			/* Wait for the next page to become available */
-			if (!XLogPageRead(&pagelsn, emode, false, false))
-				return NULL;
-
-			/* Check that the continuation on next page looks valid */
-			pageHeader = (XLogPageHeader) readBuf;
-			if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
-			{
-				ereport(emode_for_corrupt_record(emode, *RecPtr),
-						(errmsg("there is no contrecord flag in log segment %s, offset %u",
-								XLogFileNameP(curFileTLI, readSegNo),
-								readOff)));
-				goto next_record_is_invalid;
-			}
-			/*
-			 * Cross-check that xlp_rem_len agrees with how much of the record
-			 * we expect there to be left.
-			 */
-			if (pageHeader->xlp_rem_len == 0 ||
-				total_len != (pageHeader->xlp_rem_len + gotlen))
+			if (readFile >= 0)
 			{
-				ereport(emode_for_corrupt_record(emode, *RecPtr),
-						(errmsg("invalid contrecord length %u in log segment %s, offset %u",
-								pageHeader->xlp_rem_len,
-								XLogFileNameP(curFileTLI, readSegNo),
-								readOff)));
-				goto next_record_is_invalid;
+				close(readFile);
+				readFile = -1;
 			}
+		}
+	} while(StandbyMode && record == NULL);
 
-			/* Append the continuation from this page to the buffer */
-			pageHeaderSize = XLogPageHeaderSize(pageHeader);
-			contrecord = (char *) readBuf + pageHeaderSize;
-			len = XLOG_BLCKSZ - pageHeaderSize;
-			if (pageHeader->xlp_rem_len < len)
-				len = pageHeader->xlp_rem_len;
-			memcpy(buffer, (char *) contrecord, len);
-			buffer += len;
-			gotlen += len;
-
-			/* If we just reassembled the record header, validate it. */
-			if (!gotheader)
-			{
-				record = (XLogRecord *) readRecordBuf;
-				if (!ValidXLogRecordHeader(RecPtr, record, emode, randAccess))
-					goto next_record_is_invalid;
-				gotheader = true;
-			}
-		} while (pageHeader->xlp_rem_len > len);
-
-		record = (XLogRecord *) readRecordBuf;
-		if (!RecordIsValid(record, *RecPtr, emode))
-			goto next_record_is_invalid;
-		pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
-		XLogSegNoOffsetToRecPtr(
-			readSegNo,
-			readOff + pageHeaderSize + MAXALIGN(pageHeader->xlp_rem_len),
-			EndRecPtr);
-		ReadRecPtr = *RecPtr;
-	}
-	else
-	{
-		/* Record does not cross a page boundary */
-		if (!RecordIsValid(record, *RecPtr, emode))
-			goto next_record_is_invalid;
-		EndRecPtr = *RecPtr + MAXALIGN(total_len);
-
-		ReadRecPtr = *RecPtr;
-		memcpy(readRecordBuf, record, total_len);
-	}
-
-	/*
-	 * Special processing if it's an XLOG SWITCH record
-	 */
-	if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
-	{
-		/* Pretend it extends to end of segment */
-		EndRecPtr += XLogSegSize - 1;
-		EndRecPtr -= EndRecPtr % XLogSegSize;
-
-		/*
-		 * Pretend that readBuf contains the last page of the segment. This is
-		 * just to avoid Assert failure in StartupXLOG if XLOG ends with this
-		 * segment.
-		 */
-		readOff = XLogSegSize - XLOG_BLCKSZ;
-	}
 	return record;
-
-next_record_is_invalid:
-	failedSources |= readSource;
-
-	if (readFile >= 0)
-	{
-		close(readFile);
-		readFile = -1;
-	}
-
-	/* In standby-mode, keep trying */
-	if (StandbyMode)
-		goto retry;
-	else
-		return NULL;
 }
 
 /*
@@ -4223,88 +3865,6 @@ ValidXLogPageHeader(XLogPageHeader hdr, int emode)
 }
 
 /*
- * Validate an XLOG record header.
- *
- * This is just a convenience subroutine to avoid duplicated code in
- * ReadRecord.	It's not intended for use from anywhere else.
- */
-static bool
-ValidXLogRecordHeader(XLogRecPtr *RecPtr, XLogRecord *record, int emode,
-					  bool randAccess)
-{
-	/*
-	 * xl_len == 0 is bad data for everything except XLOG SWITCH, where it is
-	 * required.
-	 */
-	if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
-	{
-		if (record->xl_len != 0)
-		{
-			ereport(emode_for_corrupt_record(emode, *RecPtr),
-					(errmsg("invalid xlog switch record at %X/%X",
-							(uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-			return false;
-		}
-	}
-	else if (record->xl_len == 0)
-	{
-		ereport(emode_for_corrupt_record(emode, *RecPtr),
-				(errmsg("record with zero length at %X/%X",
-						(uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-		return false;
-	}
-	if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len ||
-		record->xl_tot_len > SizeOfXLogRecord + record->xl_len +
-		XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
-	{
-		ereport(emode_for_corrupt_record(emode, *RecPtr),
-				(errmsg("invalid record length at %X/%X",
-						(uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-		return false;
-	}
-	if (record->xl_rmid > RM_MAX_ID)
-	{
-		ereport(emode_for_corrupt_record(emode, *RecPtr),
-				(errmsg("invalid resource manager ID %u at %X/%X",
-						record->xl_rmid, (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-		return false;
-	}
-	if (randAccess)
-	{
-		/*
-		 * We can't exactly verify the prev-link, but surely it should be less
-		 * than the record's own address.
-		 */
-		if (!XLByteLT(record->xl_prev, *RecPtr))
-		{
-			ereport(emode_for_corrupt_record(emode, *RecPtr),
-					(errmsg("record with incorrect prev-link %X/%X at %X/%X",
-							(uint32) (record->xl_prev >> 32), (uint32) record->xl_prev,
-							(uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-			return false;
-		}
-	}
-	else
-	{
-		/*
-		 * Record's prev-link should exactly match our previous location. This
-		 * check guards against torn WAL pages where a stale but valid-looking
-		 * WAL record starts on a sector boundary.
-		 */
-		if (!XLByteEQ(record->xl_prev, ReadRecPtr))
-		{
-			ereport(emode_for_corrupt_record(emode, *RecPtr),
-					(errmsg("record with incorrect prev-link %X/%X at %X/%X",
-							(uint32) (record->xl_prev >> 32), (uint32) record->xl_prev,
-							(uint32) ((*RecPtr) >> 32), (uint32) *RecPtr)));
-			return false;
-		}
-	}
-
-	return true;
-}
-
-/*
  * Try to read a timeline's history file.
  *
  * If successful, return the list of component TLIs (the given TLI followed by
@@ -6089,6 +5649,7 @@ StartupXLOG(void)
 	bool		backupEndRequired = false;
 	bool		backupFromStandby = false;
 	DBState		dbstate_at_startup;
+	XLogReaderState *xlogreader;
 
 	/*
 	 * Read control file and check XLOG status looks valid.
@@ -6222,6 +5783,8 @@ StartupXLOG(void)
 	if (StandbyMode)
 		OwnLatch(&XLogCtl->recoveryWakeupLatch);
 
+	xlogreader = XLogReaderAllocate(InvalidXLogRecPtr, &XLogPageRead, NULL);
+
 	if (read_backup_label(&checkPointLoc, &backupEndRequired,
 						  &backupFromStandby))
 	{
@@ -6229,7 +5792,7 @@ StartupXLOG(void)
 		 * When a backup_label file is present, we want to roll forward from
 		 * the checkpoint it identifies, rather than using pg_control.
 		 */
-		record = ReadCheckpointRecord(checkPointLoc, 0);
+		record = ReadCheckpointRecord(xlogreader, checkPointLoc, 0);
 		if (record != NULL)
 		{
 			memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
@@ -6247,7 +5810,7 @@ StartupXLOG(void)
 			 */
 			if (XLByteLT(checkPoint.redo, checkPointLoc))
 			{
-				if (!ReadRecord(&(checkPoint.redo), LOG, false))
+				if (!ReadRecord(xlogreader, checkPoint.redo, LOG, false))
 					ereport(FATAL,
 							(errmsg("could not find redo location referenced by checkpoint record"),
 							 errhint("If you are not restoring from a backup, try removing the file \"%s/backup_label\".", DataDir)));
@@ -6271,7 +5834,7 @@ StartupXLOG(void)
 		 */
 		checkPointLoc = ControlFile->checkPoint;
 		RedoStartLSN = ControlFile->checkPointCopy.redo;
-		record = ReadCheckpointRecord(checkPointLoc, 1);
+		record = ReadCheckpointRecord(xlogreader, checkPointLoc, 1);
 		if (record != NULL)
 		{
 			ereport(DEBUG1,
@@ -6290,7 +5853,7 @@ StartupXLOG(void)
 		else
 		{
 			checkPointLoc = ControlFile->prevCheckPoint;
-			record = ReadCheckpointRecord(checkPointLoc, 2);
+			record = ReadCheckpointRecord(xlogreader, checkPointLoc, 2);
 			if (record != NULL)
 			{
 				ereport(LOG,
@@ -6591,7 +6154,7 @@ StartupXLOG(void)
 		 * Allow read-only connections immediately if we're consistent
 		 * already.
 		 */
-		CheckRecoveryConsistency();
+		CheckRecoveryConsistency(EndRecPtr);
 
 		/*
 		 * Find the first record that logically follows the checkpoint --- it
@@ -6600,12 +6163,12 @@ StartupXLOG(void)
 		if (XLByteLT(checkPoint.redo, RecPtr))
 		{
 			/* back up to find the record */
-			record = ReadRecord(&(checkPoint.redo), PANIC, false);
+			record = ReadRecord(xlogreader, checkPoint.redo, PANIC, false);
 		}
 		else
 		{
 			/* just have to read next record after CheckPoint */
-			record = ReadRecord(NULL, LOG, false);
+			record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
 		}
 
 		if (record != NULL)
@@ -6652,7 +6215,7 @@ StartupXLOG(void)
 				HandleStartupProcInterrupts();
 
 				/* Allow read-only connections if we're consistent now */
-				CheckRecoveryConsistency();
+				CheckRecoveryConsistency(EndRecPtr);
 
 				/*
 				 * Have we reached our recovery target?
@@ -6756,7 +6319,7 @@ StartupXLOG(void)
 
 				LastRec = ReadRecPtr;
 
-				record = ReadRecord(NULL, LOG, false);
+				record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
 			} while (record != NULL && recoveryContinue);
 
 			/*
@@ -6806,7 +6369,7 @@ StartupXLOG(void)
 	 * Re-fetch the last valid or last applied record, so we can identify the
 	 * exact endpoint of what we consider the valid portion of WAL.
 	 */
-	record = ReadRecord(&LastRec, PANIC, false);
+	record = ReadRecord(xlogreader, LastRec, PANIC, false);
 	EndOfLog = EndRecPtr;
 	XLByteToPrevSeg(EndOfLog, endLogSegNo);
 
@@ -6905,8 +6468,15 @@ StartupXLOG(void)
 	 * record spans, not the one it starts in.	The last block is indeed the
 	 * one we want to use.
 	 */
-	Assert(readOff == (XLogCtl->xlblocks[0] - XLOG_BLCKSZ) % XLogSegSize);
-	memcpy((char *) Insert->currpage, readBuf, XLOG_BLCKSZ);
+	if (EndOfLog % XLOG_BLCKSZ == 0)
+	{
+		memset(Insert->currpage, 0, XLOG_BLCKSZ);
+	}
+	else
+	{
+		Assert(readOff == (XLogCtl->xlblocks[0] - XLOG_BLCKSZ) % XLogSegSize);
+		memcpy((char *) Insert->currpage, xlogreader->readBuf, XLOG_BLCKSZ);
+	}
 	Insert->currpos = (char *) Insert->currpage +
 		(EndOfLog + XLOG_BLCKSZ - XLogCtl->xlblocks[0]);
 
@@ -7063,17 +6633,7 @@ StartupXLOG(void)
 		close(readFile);
 		readFile = -1;
 	}
-	if (readBuf)
-	{
-		free(readBuf);
-		readBuf = NULL;
-	}
-	if (readRecordBuf)
-	{
-		free(readRecordBuf);
-		readRecordBuf = NULL;
-		readRecordBufSize = 0;
-	}
+	XLogReaderFree(xlogreader);
 
 	/*
 	 * If any of the critical GUCs have changed, log them before we allow
@@ -7104,7 +6664,7 @@ StartupXLOG(void)
  * that it can start accepting read-only connections.
  */
 static void
-CheckRecoveryConsistency(void)
+CheckRecoveryConsistency(XLogRecPtr EndRecPtr)
 {
 	/*
 	 * During crash recovery, we don't reach a consistent state until we've
@@ -7284,7 +6844,7 @@ LocalSetXLogInsertAllowed(void)
  * 1 for "primary", 2 for "secondary", 0 for "other" (backup_label)
  */
 static XLogRecord *
-ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt)
+ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int whichChkpt)
 {
 	XLogRecord *record;
 
@@ -7308,7 +6868,7 @@ ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt)
 		return NULL;
 	}
 
-	record = ReadRecord(&RecPtr, LOG, true);
+	record = ReadRecord(xlogreader, RecPtr, LOG, true);
 
 	if (record == NULL)
 	{
@@ -10100,19 +9660,21 @@ CancelBackup(void)
  * sleep and retry.
  */
 static bool
-XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
-			 bool randAccess)
+XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
+			 bool randAccess, char *readBuf, void *private_data)
 {
+	/* TODO: these, and fetching_ckpt, would be better in private_data */
 	static XLogRecPtr receivedUpto = 0;
+	static pg_time_t last_fail_time = 0;
+	bool		fetching_ckpt = fetching_ckpt_global;
 	bool		switched_segment = false;
 	uint32		targetPageOff;
 	uint32		targetRecOff;
 	XLogSegNo	targetSegNo;
-	static pg_time_t last_fail_time = 0;
 
-	XLByteToSeg(*RecPtr, targetSegNo);
-	targetPageOff = (((*RecPtr) % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
-	targetRecOff = (*RecPtr) % XLOG_BLCKSZ;
+	XLByteToSeg(RecPtr, targetSegNo);
+	targetPageOff = ((RecPtr % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
+	targetRecOff = RecPtr % XLOG_BLCKSZ;
 
 	/* Fast exit if we have read the record in the current buffer already */
 	if (failedSources == 0 && targetSegNo == readSegNo &&
@@ -10123,7 +9685,7 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
 	 * See if we need to switch to a new segment because the requested record
 	 * is not in the currently open one.
 	 */
-	if (readFile >= 0 && !XLByteInSeg(*RecPtr, readSegNo))
+	if (readFile >= 0 && !XLByteInSeg(RecPtr, readSegNo))
 	{
 		/*
 		 * Request a restartpoint if we've replayed too much xlog since the
@@ -10144,12 +9706,12 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
 		readSource = 0;
 	}
 
-	XLByteToSeg(*RecPtr, readSegNo);
+	XLByteToSeg(RecPtr, readSegNo);
 
 retry:
 	/* See if we need to retrieve more data */
 	if (readFile < 0 ||
-		(readSource == XLOG_FROM_STREAM && !XLByteLT(*RecPtr, receivedUpto)))
+		(readSource == XLOG_FROM_STREAM && !XLByteLT(RecPtr, receivedUpto)))
 	{
 		if (StandbyMode)
 		{
@@ -10192,17 +9754,17 @@ retry:
 					 * XLogReceiptTime will not advance, so the grace time
 					 * alloted to conflicting queries will decrease.
 					 */
-					if (XLByteLT(*RecPtr, receivedUpto))
+					if (XLByteLT(RecPtr, receivedUpto))
 						havedata = true;
 					else
 					{
 						XLogRecPtr	latestChunkStart;
 
 						receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart);
-						if (XLByteLT(*RecPtr, receivedUpto))
+						if (XLByteLT(RecPtr, receivedUpto))
 						{
 							havedata = true;
-							if (!XLByteLT(*RecPtr, latestChunkStart))
+							if (!XLByteLT(RecPtr, latestChunkStart))
 							{
 								XLogReceiptTime = GetCurrentTimestamp();
 								SetCurrentChunkStartTime(XLogReceiptTime);
@@ -10321,7 +9883,7 @@ retry:
 						if (PrimaryConnInfo)
 						{
 							RequestXLogStreaming(
-									  fetching_ckpt ? RedoStartLSN : *RecPtr,
+									  fetching_ckpt ? RedoStartLSN : RecPtr,
 												 PrimaryConnInfo);
 							continue;
 						}
@@ -10393,7 +9955,7 @@ retry:
 	 */
 	if (readSource == XLOG_FROM_STREAM)
 	{
-		if (((*RecPtr) / XLOG_BLCKSZ) != (receivedUpto / XLOG_BLCKSZ))
+		if (((RecPtr) / XLOG_BLCKSZ) != (receivedUpto / XLOG_BLCKSZ))
 		{
 			readLen = XLOG_BLCKSZ;
 		}
@@ -10417,7 +9979,7 @@ retry:
 		{
 			char fname[MAXFNAMELEN];
 			XLogFileName(fname, curFileTLI, readSegNo);
-			ereport(emode_for_corrupt_record(emode, *RecPtr),
+			ereport(emode_for_corrupt_record(emode, RecPtr),
 					(errcode_for_file_access(),
 					 errmsg("could not read from log segment %s, offset %u: %m",
 							fname, readOff)));
@@ -10433,7 +9995,7 @@ retry:
 	{
 		char fname[MAXFNAMELEN];
 		XLogFileName(fname, curFileTLI, readSegNo);
-		ereport(emode_for_corrupt_record(emode, *RecPtr),
+		ereport(emode_for_corrupt_record(emode, RecPtr),
 				(errcode_for_file_access(),
 		 errmsg("could not seek in log segment %s to offset %u: %m",
 				fname, readOff)));
@@ -10443,7 +10005,7 @@ retry:
 	{
 		char fname[MAXFNAMELEN];
 		XLogFileName(fname, curFileTLI, readSegNo);
-		ereport(emode_for_corrupt_record(emode, *RecPtr),
+		ereport(emode_for_corrupt_record(emode, RecPtr),
 				(errcode_for_file_access(),
 		 errmsg("could not read from log segment %s, offset %u: %m",
 				fname, readOff)));
@@ -10501,7 +10063,7 @@ triggered:
  * you are about to ereport(), or you might cause a later message to be
  * erroneously suppressed.
  */
-static int
+int
 emode_for_corrupt_record(int emode, XLogRecPtr RecPtr)
 {
 	static XLogRecPtr lastComplaint = 0;
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
new file mode 100644
index 0000000..8ba05b1
--- /dev/null
+++ b/src/backend/access/transam/xlogreader.c
@@ -0,0 +1,496 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogreader.c
+ *		Generic xlog reading facility
+ *
+ * Portions Copyright (c) 2012, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		src/backend/access/transam/xlogreader.c
+ *
+ * NOTES
+ *		Documentation about how do use this interface can be found in
+ *		xlogreader.h, more specifically in the definition of the
+ *		XLogReaderState struct where all parameters are documented.
+ *
+ * TODO:
+ * * usable without backend code around
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/transam.h"
+#include "access/xlog_internal.h"
+#include "access/xlogreader.h"
+#include "catalog/pg_control.h"
+
+static bool ValidXLogRecordHeader(XLogRecPtr RecPtr, XLogRecPtr PrevRecPtr,
+					  XLogRecord *record, int emode, bool randAccess);
+static bool RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode);
+
+/*
+ * Initialize a new xlog reader
+ */
+XLogReaderState *
+XLogReaderAllocate(XLogRecPtr startpoint,
+				   XLogPageReadCB pagereadfunc, void *private_data)
+{
+	XLogReaderState *state;
+
+	state = (XLogReaderState *) palloc0(sizeof(XLogReaderState));
+
+	/*
+	 * First time through, permanently allocate readBuf.  We do it this
+	 * way, rather than just making a static array, for two reasons: (1)
+	 * no need to waste the storage in most instantiations of the backend;
+	 * (2) a static char array isn't guaranteed to have any particular
+	 * alignment, whereas malloc() will provide MAXALIGN'd storage.
+	 */
+	state->readBuf = (char *) palloc(XLOG_BLCKSZ);
+
+	state->read_page = pagereadfunc;
+	state->private_data = private_data;
+	state->EndRecPtr = startpoint;
+
+	return state;
+}
+
+void
+XLogReaderFree(XLogReaderState *state)
+{
+	if (state->readRecordBuf)
+		pfree(state->readRecordBuf);
+	pfree(state->readBuf);
+	pfree(state);
+}
+
+/*
+ * Attempt to read an XLOG record.
+ *
+ * If RecPtr is not NULL, try to read a record at that position.  Otherwise
+ * try to read a record just after the last one previously read.
+ *
+ * If no valid record is available, returns NULL, or fails if emode is PANIC.
+ * (emode must be either PANIC, LOG)
+ *
+ * The record is copied into readRecordBuf, so that on successful return,
+ * the returned record pointer always points there.
+ */
+XLogRecord *
+XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, int emode)
+{
+	XLogRecord *record;
+	XLogRecPtr	tmpRecPtr = state->EndRecPtr;
+	bool		randAccess = false;
+	uint32		len,
+				total_len;
+	uint32		targetRecOff;
+	uint32		pageHeaderSize;
+	bool		gotheader;
+
+	if (RecPtr == InvalidXLogRecPtr)
+	{
+		RecPtr = tmpRecPtr;
+
+		/*
+		 * RecPtr is pointing to end+1 of the previous WAL record.  If
+		 * we're at a page boundary, no more records can fit on the current
+		 * page. We must skip over the page header, but we can't do that
+		 * until we've read in the page, since the header size is variable.
+		 */
+	}
+	else
+	{
+		/*
+		 * In this case, the passed-in record pointer should already be
+		 * pointing to a valid record starting position.
+		 */
+		if (!XRecOffIsValid(RecPtr))
+			ereport(PANIC,
+					(errmsg("invalid record offset at %X/%X",
+							(uint32) (RecPtr >> 32), (uint32) RecPtr)));
+		randAccess = true;		/* allow curFileTLI to go backwards too */
+	}
+
+	/* Read the page containing the record */
+	if (!state->read_page(state, RecPtr, emode, randAccess, state->readBuf, state->private_data))
+		return NULL;
+
+	pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
+	targetRecOff = RecPtr % XLOG_BLCKSZ;
+	if (targetRecOff == 0)
+	{
+		/*
+		 * At page start, so skip over page header.  The Assert checks that
+		 * we're not scribbling on caller's record pointer; it's OK because we
+		 * can only get here in the continuing-from-prev-record case, since
+		 * XRecOffIsValid rejected the zero-page-offset case otherwise.
+		 * XXX: does this assert make sense now that RecPtr is not a pointer?
+		 */
+		Assert(RecPtr == tmpRecPtr);
+		RecPtr += pageHeaderSize;
+		targetRecOff = pageHeaderSize;
+	}
+	else if (targetRecOff < pageHeaderSize)
+	{
+		ereport(emode_for_corrupt_record(emode, RecPtr),
+				(errmsg("invalid record offset at %X/%X",
+						(uint32) (RecPtr >> 32), (uint32) RecPtr)));
+		goto next_record_is_invalid;
+	}
+	if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
+		targetRecOff == pageHeaderSize)
+	{
+		ereport(emode_for_corrupt_record(emode, RecPtr),
+				(errmsg("contrecord is requested by %X/%X",
+						(uint32) (RecPtr >> 32), (uint32) RecPtr)));
+		goto next_record_is_invalid;
+	}
+
+	/*
+	 * Read the record length.
+	 *
+	 * NB: Even though we use an XLogRecord pointer here, the whole record
+	 * header might not fit on this page. xl_tot_len is the first field of
+	 * the struct, so it must be on this page (the records are MAXALIGNed),
+	 * but we cannot access any other fields until we've verified that we
+	 * got the whole header.
+	 */
+	record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
+	total_len = record->xl_tot_len;
+
+	/*
+	 * If the whole record header is on this page, validate it immediately.
+	 * Otherwise do just a basic sanity check on xl_tot_len, and validate the
+	 * rest of the header after reading it from the next page.  The xl_tot_len
+	 * check is necessary here to ensure that we enter the "Need to reassemble
+	 * record" code path below; otherwise we might fail to apply
+	 * ValidXLogRecordHeader at all.
+	 */
+	if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
+	{
+		if (!ValidXLogRecordHeader(RecPtr, state->ReadRecPtr, record, emode, randAccess))
+			goto next_record_is_invalid;
+		gotheader = true;
+	}
+	else
+	{
+		if (total_len < SizeOfXLogRecord)
+		{
+			ereport(emode_for_corrupt_record(emode, RecPtr),
+					(errmsg("invalid record length at %X/%X",
+							(uint32) (RecPtr >> 32), (uint32) RecPtr)));
+			goto next_record_is_invalid;
+		}
+		gotheader = false;
+	}
+
+	/*
+	 * Allocate or enlarge readRecordBuf as needed.  To avoid useless small
+	 * increases, round its size to a multiple of XLOG_BLCKSZ, and make sure
+	 * it's at least 4*Max(BLCKSZ, XLOG_BLCKSZ) to start with.  (That is
+	 * enough for all "normal" records, but very large commit or abort records
+	 * might need more space.)
+	 */
+	if (total_len > state->readRecordBufSize)
+	{
+		uint32		newSize = total_len;
+
+		newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
+		newSize = Max(newSize, 4 * Max(BLCKSZ, XLOG_BLCKSZ));
+		if (state->readRecordBuf)
+			pfree(state->readRecordBuf);
+		state->readRecordBuf = (char *) palloc(newSize);
+		if (!state->readRecordBuf)
+		{
+			state->readRecordBufSize = 0;
+			/* We treat this as a "bogus data" condition */
+			ereport(emode_for_corrupt_record(emode, RecPtr),
+					(errmsg("record length %u at %X/%X too long",
+							total_len, (uint32) (RecPtr >> 32), (uint32) RecPtr)));
+			goto next_record_is_invalid;
+		}
+		state->readRecordBufSize = newSize;
+	}
+
+	len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
+	if (total_len > len)
+	{
+		/* Need to reassemble record */
+		char	   *contrecord;
+		XLogPageHeader pageHeader;
+		XLogRecPtr	pagelsn;
+		char	   *buffer;
+		uint32		gotlen;
+
+		/* Initialize pagelsn to the beginning of the page this record is on */
+		pagelsn = (RecPtr / XLOG_BLCKSZ) * XLOG_BLCKSZ;
+
+		/* Copy the first fragment of the record from the first page. */
+		memcpy(state->readRecordBuf, state->readBuf + RecPtr % XLOG_BLCKSZ, len);
+		buffer = state->readRecordBuf + len;
+		gotlen = len;
+
+		do
+		{
+			/* Calculate pointer to beginning of next page */
+			XLByteAdvance(pagelsn, XLOG_BLCKSZ);
+			/* Wait for the next page to become available */
+			if (!state->read_page(state, pagelsn, emode, false, state->readBuf, NULL))
+				return NULL;
+
+			/* Check that the continuation on next page looks valid */
+			pageHeader = (XLogPageHeader) state->readBuf;
+			if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
+			{
+				ereport(emode_for_corrupt_record(emode, RecPtr),
+						(errmsg("there is no contrecord flag at %X/%X",
+								(uint32) (RecPtr >> 32), (uint32) RecPtr)));
+				goto next_record_is_invalid;
+			}
+			/*
+			 * Cross-check that xlp_rem_len agrees with how much of the record
+			 * we expect there to be left.
+			 */
+			if (pageHeader->xlp_rem_len == 0 ||
+				total_len != (pageHeader->xlp_rem_len + gotlen))
+			{
+				ereport(emode_for_corrupt_record(emode, RecPtr),
+						(errmsg("invalid contrecord length %u at %X/%X",
+								pageHeader->xlp_rem_len,
+								(uint32) (RecPtr >> 32), (uint32) RecPtr)));
+				goto next_record_is_invalid;
+			}
+
+			/* Append the continuation from this page to the buffer */
+			pageHeaderSize = XLogPageHeaderSize(pageHeader);
+			contrecord = (char *) state->readBuf + pageHeaderSize;
+			len = XLOG_BLCKSZ - pageHeaderSize;
+			if (pageHeader->xlp_rem_len < len)
+				len = pageHeader->xlp_rem_len;
+			memcpy(buffer, (char *) contrecord, len);
+			buffer += len;
+			gotlen += len;
+
+			/* If we just reassembled the record header, validate it. */
+			if (!gotheader)
+			{
+				record = (XLogRecord *) state->readRecordBuf;
+				if (!ValidXLogRecordHeader(RecPtr, state->ReadRecPtr, record, emode, randAccess))
+					goto next_record_is_invalid;
+				gotheader = true;
+			}
+		} while (pageHeader->xlp_rem_len > len);
+
+		record = (XLogRecord *) state->readRecordBuf;
+		if (!RecordIsValid(record, RecPtr, emode))
+			goto next_record_is_invalid;
+		pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
+		state->ReadRecPtr = RecPtr;
+		state->EndRecPtr = pagelsn + pageHeaderSize + MAXALIGN(pageHeader->xlp_rem_len);
+	}
+	else
+	{
+		/* Record does not cross a page boundary */
+		if (!RecordIsValid(record, RecPtr, emode))
+			goto next_record_is_invalid;
+		state->EndRecPtr = RecPtr + MAXALIGN(total_len);
+
+		state->ReadRecPtr = RecPtr;
+		memcpy(state->readRecordBuf, record, total_len);
+	}
+
+	/*
+	 * Special processing if it's an XLOG SWITCH record
+	 */
+	if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
+	{
+		/* Pretend it extends to end of segment */
+		state->EndRecPtr += XLogSegSize - 1;
+		state->EndRecPtr -= state->EndRecPtr % XLogSegSize;
+	}
+	return record;
+
+next_record_is_invalid:
+	return NULL;
+}
+
+/*
+ * Validate an XLOG record header.
+ *
+ * This is just a convenience subroutine to avoid duplicated code in
+ * ReadRecord.	It's not intended for use from anywhere else.
+ */
+static bool
+ValidXLogRecordHeader(XLogRecPtr RecPtr, XLogRecPtr PrevRecPtr, XLogRecord *record, int emode,
+					  bool randAccess)
+{
+	/*
+	 * xl_len == 0 is bad data for everything except XLOG SWITCH, where it is
+	 * required.
+	 */
+	if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
+	{
+		if (record->xl_len != 0)
+		{
+			ereport(emode_for_corrupt_record(emode, RecPtr),
+					(errmsg("invalid xlog switch record at %X/%X",
+							(uint32) (RecPtr >> 32), (uint32) RecPtr)));
+			return false;
+		}
+	}
+	else if (record->xl_len == 0)
+	{
+		ereport(emode_for_corrupt_record(emode, RecPtr),
+				(errmsg("record with zero length at %X/%X",
+						(uint32) (RecPtr >> 32), (uint32) RecPtr)));
+		return false;
+	}
+	if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len ||
+		record->xl_tot_len > SizeOfXLogRecord + record->xl_len +
+		XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
+	{
+		ereport(emode_for_corrupt_record(emode, RecPtr),
+				(errmsg("invalid record length at %X/%X",
+						(uint32) (RecPtr >> 32), (uint32) RecPtr)));
+		return false;
+	}
+	if (record->xl_rmid > RM_MAX_ID)
+	{
+		ereport(emode_for_corrupt_record(emode, RecPtr),
+				(errmsg("invalid resource manager ID %u at %X/%X",
+						record->xl_rmid, (uint32) (RecPtr >> 32), (uint32) RecPtr)));
+		return false;
+	}
+	if (randAccess)
+	{
+		/*
+		 * We can't exactly verify the prev-link, but surely it should be less
+		 * than the record's own address.
+		 */
+		if (!XLByteLT(record->xl_prev, RecPtr))
+		{
+			ereport(emode_for_corrupt_record(emode, RecPtr),
+					(errmsg("record with incorrect prev-link %X/%X at %X/%X",
+							(uint32) (record->xl_prev >> 32), (uint32) record->xl_prev,
+							(uint32) (RecPtr >> 32), (uint32) RecPtr)));
+			return false;
+		}
+	}
+	else
+	{
+		/*
+		 * Record's prev-link should exactly match our previous location. This
+		 * check guards against torn WAL pages where a stale but valid-looking
+		 * WAL record starts on a sector boundary.
+		 */
+		if (!XLByteEQ(record->xl_prev, PrevRecPtr))
+		{
+			ereport(emode_for_corrupt_record(emode, RecPtr),
+					(errmsg("record with incorrect prev-link %X/%X at %X/%X",
+							(uint32) (record->xl_prev >> 32), (uint32) record->xl_prev,
+							(uint32) (RecPtr >> 32), (uint32) RecPtr)));
+			return false;
+		}
+	}
+
+	return true;
+}
+
+
+/*
+ * CRC-check an XLOG record.  We do not believe the contents of an XLOG
+ * record (other than to the minimal extent of computing the amount of
+ * data to read in) until we've checked the CRCs.
+ *
+ * We assume all of the record (that is, xl_tot_len bytes) has been read
+ * into memory at *record.  Also, ValidXLogRecordHeader() has accepted the
+ * record's header, which means in particular that xl_tot_len is at least
+ * SizeOfXlogRecord, so it is safe to fetch xl_len.
+ */
+static bool
+RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
+{
+	pg_crc32	crc;
+	int			i;
+	uint32		len = record->xl_len;
+	BkpBlock	bkpb;
+	char	   *blk;
+	size_t		remaining = record->xl_tot_len;
+
+	/* First the rmgr data */
+	if (remaining < SizeOfXLogRecord + len)
+	{
+		/* ValidXLogRecordHeader() should've caught this already... */
+		ereport(emode_for_corrupt_record(emode, recptr),
+				(errmsg("invalid record length at %X/%X",
+						(uint32) (recptr >> 32), (uint32) recptr)));
+		return false;
+	}
+	remaining -= SizeOfXLogRecord + len;
+	INIT_CRC32(crc);
+	COMP_CRC32(crc, XLogRecGetData(record), len);
+
+	/* Add in the backup blocks, if any */
+	blk = (char *) XLogRecGetData(record) + len;
+	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
+	{
+		uint32		blen;
+
+		if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
+			continue;
+
+		if (remaining < sizeof(BkpBlock))
+		{
+			ereport(emode_for_corrupt_record(emode, recptr),
+					(errmsg("invalid backup block size in record at %X/%X",
+							(uint32) (recptr >> 32), (uint32) recptr)));
+			return false;
+		}
+		memcpy(&bkpb, blk, sizeof(BkpBlock));
+
+		if (bkpb.hole_offset + bkpb.hole_length > BLCKSZ)
+		{
+			ereport(emode_for_corrupt_record(emode, recptr),
+					(errmsg("incorrect hole size in record at %X/%X",
+							(uint32) (recptr >> 32), (uint32) recptr)));
+			return false;
+		}
+		blen = sizeof(BkpBlock) + BLCKSZ - bkpb.hole_length;
+
+		if (remaining < blen)
+		{
+			ereport(emode_for_corrupt_record(emode, recptr),
+					(errmsg("invalid backup block size in record at %X/%X",
+							(uint32) (recptr >> 32), (uint32) recptr)));
+			return false;
+		}
+		remaining -= blen;
+		COMP_CRC32(crc, blk, blen);
+		blk += blen;
+	}
+
+	/* Check that xl_tot_len agrees with our calculation */
+	if (remaining != 0)
+	{
+		ereport(emode_for_corrupt_record(emode, recptr),
+				(errmsg("incorrect total length in record at %X/%X",
+						(uint32) (recptr >> 32), (uint32) recptr)));
+		return false;
+	}
+
+	/* Finally include the record header */
+	COMP_CRC32(crc, (char *) record, offsetof(XLogRecord, xl_crc));
+	FIN_CRC32(crc);
+
+	if (!EQ_CRC32(record->xl_crc, crc))
+	{
+		ereport(emode_for_corrupt_record(emode, recptr),
+		(errmsg("incorrect resource manager data checksum in record at %X/%X",
+				(uint32) (recptr >> 32), (uint32) recptr)));
+		return false;
+	}
+
+	return true;
+}
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index b5bfb7b..1ada664 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -229,6 +229,14 @@ extern const RmgrData RmgrTable[];
 extern pg_time_t GetLastSegSwitchTime(void);
 extern XLogRecPtr RequestXLogSwitch(void);
 
+
+/*
+ * Exported so that xlogreader.c can call this. TODO: Should be refactored
+ * into a callback, or just have xlogreader return the error string and have
+ * the caller of XLogReadRecord() do the ereport() call.
+ */
+extern int	emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
+
 /*
  * These aren't in xlog.h because I'd rather not include fmgr.h there.
  */
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
new file mode 100644
index 0000000..d475a9b
--- /dev/null
+++ b/src/include/access/xlogreader.h
@@ -0,0 +1,101 @@
+/*-------------------------------------------------------------------------
+ *
+ * readxlog.h
+ *
+ *		Generic xlog reading facility.
+ *
+ * Portions Copyright (c) 2012, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		src/include/access/xlogreader.h
+ *
+ * NOTES
+ *		Check the definition of the XLogReaderState struct for instructions on
+ *		how to use the XLogReader infrastructure.
+ *
+ *		The basic idea is to allocate an XLogReaderState via
+ *		XLogReaderAllocate, and call XLogReadRecord() until it returns NULL.
+ *-------------------------------------------------------------------------
+ */
+#ifndef XLOGREADER_H
+#define XLOGREADER_H
+
+#include "access/xlog_internal.h"
+
+struct XLogReaderState;
+
+/*
+ * The callbacks are explained in more detail inside the XLogReaderState
+ * struct.
+ */
+typedef bool (*XLogPageReadCB)(struct XLogReaderState *state,
+							   XLogRecPtr RecPtr, int emode,
+							   bool randAccess,
+							   char *readBuf,
+							   void *private_data);
+
+typedef struct XLogReaderState
+{
+	/* ----------------------------------------
+	 * Public parameters
+	 * ----------------------------------------
+	 */
+
+	/* callbacks */
+
+	/*
+	 * Data input function.
+	 *
+	 * This callback *has* to be implemented.
+	 *
+	 * Has to read XLOG_BLKSZ bytes that are at the location 'RecPtr' into the
+	 * memory pointed at by 'readBuf' parameter. Returns true on success,
+	 * false if the page could not be read.
+	 */
+	XLogPageReadCB read_page;
+
+	/*
+	 * this can be used by the caller to pass state to the callbacks without
+	 * using global variables or such ugliness. It will neither be read or set
+	 * by anything but your code.
+	 */
+	void *private_data;
+
+	/* from where to where are we reading */
+
+	XLogRecPtr ReadRecPtr;	/* start of last record read */
+	XLogRecPtr EndRecPtr;	/* end+1 of last record read */
+
+	/* ----------------------------------------
+	 * private/internal state
+	 * ----------------------------------------
+	 */
+
+	/* Buffer for currently read page (XLOG_BLCKSZ bytes) */
+	char	   *readBuf;
+
+	/* Buffer for current ReadRecord result (expandable) */
+	char	   *readRecordBuf;
+	uint32		readRecordBufSize;
+} XLogReaderState;
+
+/*
+ * Get a new XLogReader
+ *
+ * At least the read_page callback, startptr and endptr have to be set before
+ * the reader can be used.
+ */
+extern XLogReaderState *XLogReaderAllocate(XLogRecPtr startpoint,
+				   XLogPageReadCB pagereadfunc, void *private_data);
+
+/*
+ * Free an XLogReader
+ */
+extern void XLogReaderFree(XLogReaderState *state);
+
+/*
+ * Read the next record from xlog. Returns NULL on end-of-WAL or on failure.
+ */
+extern XLogRecord *XLogReadRecord(XLogReaderState *state, XLogRecPtr ptr, int emode);
+
+#endif /* XLOGREADER_H */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to