Features:
- streaming reading/writing
- filtering
- reassembly of records

Reusing the ReadRecord infrastructure in situations where the code that wants
to do so is not tightly integrated into xlog.c is rather hard and would require
changes to rather integral parts of the recovery code which doesn't seem to be
a good idea.

Missing:
- "compressing" the stream when removing uninteresting records
- writing out correct CRCs
- separating reader/writer
---
 src/backend/access/transam/Makefile     |    2 +-
 src/backend/access/transam/xlogreader.c | 1032 +++++++++++++++++++++++++++++++
 src/include/access/xlogreader.h         |  264 ++++++++
 3 files changed, 1297 insertions(+), 1 deletion(-)
 create mode 100644 src/backend/access/transam/xlogreader.c
 create mode 100644 src/include/access/xlogreader.h

diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index f82f10e..660b5fc 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -13,7 +13,7 @@ top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = clog.o transam.o varsup.o xact.o rmgr.o slru.o subtrans.o multixact.o \
-	twophase.o twophase_rmgr.o xlog.o xlogfuncs.o xlogutils.o
+	twophase.o twophase_rmgr.o xlog.o xlogfuncs.o xlogreader.o xlogutils.o
 
 include $(top_srcdir)/src/backend/common.mk
 
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
new file mode 100644
index 0000000..4392b29
--- /dev/null
+++ b/src/backend/access/transam/xlogreader.c
@@ -0,0 +1,1032 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogreader.c
+ *		Generic xlog reading facility
+ *
+ * Portions Copyright (c) 2012, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		src/backend/access/transam/readxlog.c
+ *
+ * NOTES
+ *		Documentation about how do use this interface can be found in
+ *		xlogreader.h, more specifically in the definition of the
+ *		XLogReaderState struct where all parameters are documented.
+ *
+ * TODO:
+ * * more extensive validation of read records
+ * * separation of reader/writer
+ * * customizable error response
+ * * usable without backend code around
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xlog_internal.h"
+#include "access/transam.h"
+#include "catalog/pg_control.h"
+#include "access/xlogreader.h"
+
+/* If (very) verbose debugging is needed:
+ * #define VERBOSE_DEBUG
+ */
+
+XLogReaderState*
+XLogReaderAllocate(void)
+{
+	XLogReaderState* state = (XLogReaderState*)malloc(sizeof(XLogReaderState));
+	int i;
+
+	if (!state)
+		goto oom;
+
+	memset(&state->buf.record, 0, sizeof(XLogRecord));
+	state->buf.record_data_size = XLOG_BLCKSZ*8;
+	state->buf.record_data =
+			malloc(state->buf.record_data_size);
+
+	if (!state->buf.record_data)
+		goto oom;
+
+	memset(state->buf.record_data, 0, state->buf.record_data_size);
+	state->buf.origptr = InvalidXLogRecPtr;
+
+	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
+	{
+		state->buf.bkp_block_data[i] =
+			malloc(BLCKSZ);
+
+		if (!state->buf.bkp_block_data[i])
+			goto oom;
+	}
+
+	state->is_record_interesting = NULL;
+	state->writeout_data = NULL;
+	state->finished_record = NULL;
+	state->private_data = NULL;
+	state->output_buffer_size = 0;
+
+	XLogReaderReset(state);
+	return state;
+
+oom:
+	ereport(ERROR,
+	        (errcode(ERRCODE_OUT_OF_MEMORY),
+	         errmsg("out of memory"),
+	         errdetail("failed while allocating an XLogReader")));
+	return NULL;
+}
+
+void
+XLogReaderFree(XLogReaderState* state)
+{
+	int i;
+
+	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
+	{
+		free(state->buf.bkp_block_data[i]);
+	}
+
+	free(state->buf.record_data);
+
+	free(state);
+}
+
+void
+XLogReaderReset(XLogReaderState* state)
+{	state->in_record = false;
+	state->in_record_header = false;
+	state->do_reassemble_record = false;
+	state->in_bkp_blocks = 0;
+	state->in_bkp_block_header = false;
+	state->in_skip = false;
+	state->remaining_size = 0;
+	state->already_written_size = 0;
+	state->incomplete = false;
+	state->initialized = false;
+	state->needs_input = false;
+	state->needs_output = false;
+	state->stop_at_record_boundary = false;
+}
+
+static inline bool
+XLogReaderHasInput(XLogReaderState* state, Size size)
+{
+	XLogRecPtr tmp = state->curptr;
+	XLByteAdvance(tmp, size);
+	if (XLByteLE(state->endptr, tmp))
+		return false;
+	return true;
+}
+
+static inline bool
+XLogReaderHasOutput(XLogReaderState* state, Size size){
+	/* if we don't do output or have no limits in the output size */
+	if (state->writeout_data == NULL || state->output_buffer_size == 0)
+		return true;
+
+	if (state->already_written_size + size > state->output_buffer_size)
+		return false;
+
+	return true;
+}
+
+static inline bool
+XLogReaderHasSpace(XLogReaderState* state, Size size)
+{
+	if (!XLogReaderHasInput(state, size))
+		return false;
+
+	if (!XLogReaderHasOutput(state, size))
+		return false;
+
+	return true;
+}
+
+/* ----------------------------------------------------------------------------
+ * Write out data iff
+ * 1. we have a writeout_data callback
+ * 2. were currently behind startptr
+ *
+ * The 2nd condition requires that we will never start a write before startptr
+ * and finish after it. The code needs to guarantee this.
+ * ----------------------------------------------------------------------------
+ */
+static void
+XLogReaderInternalWrite(XLogReaderState* state, char* data, Size size)
+{
+	/* no point in doing any checks if we don't have a write callback */
+	if (!state->writeout_data)
+		return;
+
+	if (XLByteLT(state->curptr, state->startptr))
+		return;
+
+	state->writeout_data(state, data, size);
+}
+
+/*
+ * Change state so we read the next bkp block if there is one. If there is none
+ * return false so that the caller can consider the record finished.
+ */
+static bool
+XLogReaderInternalNextBkpBlock(XLogReaderState* state)
+{
+	Assert(state->in_record);
+	Assert(state->remaining_size == 0);
+
+	/*
+	 * only continue with in_record=true if we have bkp block
+	 */
+	while (state->in_bkp_blocks)
+	{
+		if (state->buf.record.xl_info &
+		    XLR_SET_BKP_BLOCK(XLR_MAX_BKP_BLOCKS - state->in_bkp_blocks))
+		{
+#ifdef VERBOSE_DEBUG
+			elog(LOG, "reading bkp block %u", XLR_MAX_BKP_BLOCKS - state->in_bkp_blocks);
+#endif
+			break;
+		}
+		state->in_bkp_blocks--;
+	}
+
+	if (!state->in_bkp_blocks)
+		return false;
+
+	/* bkp blocks are stored without regard for alignment */
+
+	state->in_bkp_block_header = true;
+	state->remaining_size = sizeof(BkpBlock);
+
+	return true;
+}
+
+void
+XLogReaderRead(XLogReaderState* state)
+{
+	state->needs_input = false;
+	state->needs_output = false;
+
+	/*
+	 * Do some basic sanity checking and setup if were starting anew.
+	 */
+	if (!state->initialized)
+	{
+		if (!state->read_page)
+			elog(ERROR, "The read_page callback needs to be set");
+
+		state->initialized = true;
+		/*
+		 * we need to start reading at the beginning of the page to understand
+		 * what we are currently reading. We will skip over that because we
+		 * check curptr < startptr later.
+		 */
+		state->curptr = state->startptr;
+		state->curptr -= state->startptr % XLOG_BLCKSZ;
+
+		Assert(state->curptr % XLOG_BLCKSZ == 0);
+
+		elog(LOG, "start reading from %X/%X, scrolled back to %X/%X",
+		     (uint32) (state->startptr >> 32), (uint32) state->startptr,
+		     (uint32) (state->curptr >> 32), (uint32) state->curptr);
+	}
+	else
+	{
+		/*
+		 * We didn't finish reading the last time round. Since then new data
+		 * could have been appended to the current page. So we need to update
+		 * our copy of that.
+		 *
+		 * XXX: We could tie that to state->needs_input but that doesn't seem
+		 * worth the complication atm.
+		 */
+		XLogRecPtr rereadptr = state->curptr;
+		rereadptr -= rereadptr % XLOG_BLCKSZ;
+
+		XLByteAdvance(rereadptr, SizeOfXLogShortPHD);
+
+		if(!XLByteLE(rereadptr, state->endptr))
+			goto not_enough_input;
+
+		rereadptr -= rereadptr % XLOG_BLCKSZ;
+
+		state->read_page(state, state->cur_page, rereadptr);
+
+		/*
+		 * we will only rely on this data being valid if we are allowed to read
+		 * that far, so its safe to just always read the header. read_page
+		 * always returns a complete page even though its contents may be
+		 * invalid.
+		 */
+		state->page_header = (XLogPageHeader)state->cur_page;
+		state->page_header_size = XLogPageHeaderSize(state->page_header);
+	}
+
+#ifdef VERBOSE_DEBUG
+	elog(LOG, "starting reading for %X/%X from %X/%X",
+	     (uint32)(state->startptr >> 32), (uint32) state->startptr,
+	     (uint32)(state->curptr >> 32), (uint32) state->curptr);
+#endif
+	/*
+	 * Iterate over the data and reassemble it until we reached the end of the
+	 * data. As we advance curptr inside the loop we need to recheck whether we
+	 * have space inside as well.
+	 */
+	while (XLByteLT(state->curptr, state->endptr))
+	{
+		/* how much space is left in the current block */
+		uint32 len_in_block;
+
+		/*
+		 * did we read a partial xlog record due to input/output constraints?
+		 * If yes, we need to signal that to the caller so it can be handled
+		 * sensibly there. E.g. by waiting on a latch till more xlog is
+		 * available.
+		 */
+		bool partial_read = false;
+		bool partial_write = false;
+
+#ifdef VERBOSE_DEBUG
+		elog(LOG, "one loop start: record: %u header %u, skip: %u bkb_block: %d in_bkp_header: %u curptr: %X/%X remaining: %u, off: %u",
+		     state->in_record, state->in_record_header, state->in_skip,
+		     state->in_bkp_blocks, state->in_bkp_block_header,
+		     (uint32)(state->curptr >> 32), (uint32)state->curptr,
+		     state->remaining_size,
+		     (uint32)(state->curptr % XLOG_BLCKSZ));
+#endif
+
+		/*
+		 * at a page boundary, read the header
+		 */
+		if (state->curptr % XLOG_BLCKSZ == 0)
+		{
+#ifdef VERBOSE_DEBUG
+			elog(LOG, "reading page header, at %X/%X",
+			     (uint32)(state->curptr >> 32), (uint32)state->curptr);
+#endif
+			/*
+			 * check whether we can read enough to see the short header, we
+			 * need to read the short header's xlp_info to know whether this is
+			 * a short or a long header.
+			 */
+			if (!XLogReaderHasInput(state, SizeOfXLogShortPHD))
+				goto not_enough_input;
+
+			state->read_page(state, state->cur_page, state->curptr);
+			state->page_header = (XLogPageHeader)state->cur_page;
+			state->page_header_size = XLogPageHeaderSize(state->page_header);
+
+			/* check that we have enough space to read/write the full header */
+			if (!XLogReaderHasInput(state, state->page_header_size))
+				goto not_enough_input;
+
+			if (!XLogReaderHasOutput(state, state->page_header_size))
+				goto not_enough_output;
+
+			XLogReaderInternalWrite(state, state->cur_page, state->page_header_size);
+
+			XLByteAdvance(state->curptr, state->page_header_size);
+
+			if (state->page_header->xlp_info & XLP_FIRST_IS_CONTRECORD)
+			{
+				if (!state->in_record)
+				{
+					/*
+					 * we need to support this case for initializing a cluster
+					 * because we need to read/writeout a full page but there
+					 * may be none without records being split across.
+					 *
+					 * If we are before startptr there is nothing special about
+					 * this case. Most pages start with a contrecord.
+					 */
+					if(!XLByteLT(state->curptr, state->startptr))
+					{
+						elog(WARNING, "contrecord although we are not in a record at %X/%X, starting at %X/%X",
+						     (uint32)(state->curptr >> 32), (uint32)state->curptr,
+						     (uint32)(state->startptr >> 32), (uint32)state->startptr);
+					}
+					state->in_record = true;
+					state->check_crc = false;
+					state->do_reassemble_record = false;
+					state->remaining_size = state->page_header->xlp_rem_len;
+					continue;
+				}
+				else
+				{
+					if (state->page_header->xlp_rem_len < state->remaining_size)
+						elog(PANIC, "remaining length is smaller than to be read data. xlp_rem_len: %u needed: %u",
+						     state->page_header->xlp_rem_len, state->remaining_size
+							);
+				}
+			}
+			else if (state->in_record)
+			{
+				elog(PANIC, "no contrecord although were in a record that continued onto the next page. info %hhu at page %X/%X",
+				     state->page_header->xlp_info,
+				     (uint32)(state->page_header->xlp_pageaddr >> 32),
+				     (uint32)state->page_header->xlp_pageaddr);
+			}
+		}
+
+		/*
+		 * If a record will start next, skip over alignment padding.
+		 */
+		if (!state->in_record)
+		{
+			/*
+			 * a record must be stored aligned. So skip as far we need to
+			 * comply with that.
+			 */
+			Size skiplen;
+			skiplen = MAXALIGN(state->curptr) - state->curptr;
+
+			if (skiplen)
+			{
+				if (!XLogReaderHasSpace(state, skiplen))
+				{
+#ifdef VERBOSE_DEBUG
+					elog(LOG, "not aligning bc of space");
+#endif
+					/*
+					 * We don't have enough space to read/write the alignment
+					 * bytes, so fake up a skip-state
+					 */
+					state->in_record = true;
+					state->check_crc = false;
+					state->in_skip = true;
+					state->remaining_size = skiplen;
+
+					if (!XLogReaderHasInput(state, skiplen))
+						goto not_enough_input;
+					goto not_enough_output;
+				}
+#ifdef VERBOSE_DEBUG
+				elog(LOG, "aligning from %X/%X to %X/%X, skips %lu",
+				     (uint32)(state->curptr >> 32), (uint32)state->curptr,
+				     (uint32)((state->curptr + skiplen) >> 32),
+				     (uint32)(state->curptr + skiplen),
+				     skiplen
+					);
+#endif
+				XLogReaderInternalWrite(state, NULL, skiplen);
+
+				XLByteAdvance(state->curptr, skiplen);
+
+				/*
+				 * full pages are not treated as continuations, so restart on
+				 * the beginning of the new page.
+				 */
+				if ((state->curptr % XLOG_BLCKSZ) == 0)
+					continue;
+			}
+		}
+
+		/*
+		 * --------------------------------------------------------------------
+		 * Start to read a record
+		 * --------------------------------------------------------------------
+		 */
+		if (!state->in_record)
+		{
+			state->in_record = true;
+			state->in_record_header = true;
+			state->check_crc = true;
+
+			/*
+			 * If the record starts before startptr were not interested in its
+			 * contents. There is also no point in reassembling if were not
+			 * analyzing the contents.
+			 *
+			 * If every record needs to be processed by finish_record restarts
+			 * need to be started after the end of the last record.
+			 *
+			 * See state->restart_ptr for that point.
+			 */
+			if ((state->finished_record == NULL &&
+			     !state->stop_at_record_boundary) ||
+				XLByteLT(state->curptr, state->startptr)){
+				state->do_reassemble_record = false;
+			}
+			else
+				state->do_reassemble_record = true;
+
+			state->remaining_size = SizeOfXLogRecord;
+
+			/*
+			 * we quickly loose the original address of a record as we can skip
+			 * records and such, so keep the original addresses.
+			 */
+			state->buf.origptr = state->curptr;
+
+			INIT_CRC32(state->next_crc);
+		}
+
+		Assert(state->in_record);
+
+		/*
+		 * Compute how much space on the current page is left and how much of
+		 * that we actually are interested in.
+		 */
+
+		/* amount of space on page */
+		if (state->curptr % XLOG_BLCKSZ == 0)
+			len_in_block = 0;
+		else
+			len_in_block = XLOG_BLCKSZ - (state->curptr % XLOG_BLCKSZ);
+
+		/* we have more data available than we need, so read only as much as needed */
+		if (len_in_block > state->remaining_size)
+			len_in_block = state->remaining_size;
+
+		/*
+		 * Handle constraints set by startptr, endptr and the size of the
+		 * output buffer.
+		 *
+		 * Normally we use XLogReaderHasSpace for that, but thats not
+		 * convenient here because we want to read data in parts. It also
+		 * doesn't handle splitting around startptr. So, open-code the logic
+		 * for that.
+		 */
+
+		/* to make sure we always writeout in the same chunks, split at startptr */
+		if (XLByteLT(state->curptr, state->startptr) &&
+		    (state->curptr + len_in_block) > state->startptr )
+		{
+#ifdef VERBOSE_DEBUG
+			Size cur_len = len_in_block;
+#endif
+			len_in_block = state->startptr - state->curptr;
+#ifdef VERBOSE_DEBUG
+			elog(LOG, "truncating len_in_block due to startptr from %lu to %u",
+			     cur_len, len_in_block);
+#endif
+		}
+
+		/* do we have enough valid data to read the current block? */
+		if (state->curptr + len_in_block > state->endptr)
+		{
+#ifdef VERBOSE_DEBUG
+			Size cur_len = len_in_block;
+#endif
+			len_in_block = state->endptr - state->curptr;
+			partial_read = true;
+#ifdef VERBOSE_DEBUG
+			elog(LOG, "truncating len_in_block due to endptr %X/%X %lu to %i at %X/%X",
+			     (uint32)(state->startptr >> 32), (uint32)state->startptr,
+			     cur_len, len_in_block,
+			     (uint32)(state->curptr >> 32), (uint32)state->curptr);
+#endif
+		}
+
+		/* can we write what we read? */
+		if (state->writeout_data != NULL && state->output_buffer_size != 0
+				&& len_in_block > (state->output_buffer_size - state->already_written_size))
+		{
+#ifdef VERBOSE_DEBUG
+			Size cur_len = len_in_block;
+#endif
+			len_in_block = state->output_buffer_size - state->already_written_size;
+			partial_write = true;
+#ifdef VERBOSE_DEBUG
+			elog(LOG, "truncating len_in_block due to output_buffer_size %lu to %i",
+			     cur_len, len_in_block);
+#endif
+		}
+
+		/* --------------------------------------------------------------------
+		 * copy data of the size determined above to whatever we are currently
+		 * reading.
+		 * --------------------------------------------------------------------
+		 */
+
+		/* nothing to do if were skipping */
+		if (state->in_skip)
+		{
+			/* writeout zero data, original content is boring */
+			XLogReaderInternalWrite(state, NULL, len_in_block);
+
+			/*
+			 * we may not need this here because were skipping over something
+			 * really uninteresting but keeping track of that would be
+			 * unnecessarily complicated.
+			 */
+			COMP_CRC32(state->next_crc,
+			           state->cur_page + (state->curptr % XLOG_BLCKSZ),
+			           len_in_block);
+		}
+		/* reassemble the XLogRecord struct, quite likely in one-go */
+		else if (state->in_record_header)
+		{
+			/*
+			 * Need to clampt o sizeof(XLogRecord), we don't have the padding
+			 * in buf.record...
+			 */
+			Size already_written = SizeOfXLogRecord - state->remaining_size;
+			Size padding_size = SizeOfXLogRecord - sizeof(XLogRecord);
+			Size copysize = len_in_block;
+
+			if (state->remaining_size - len_in_block < padding_size)
+				copysize = Max(0, state->remaining_size - (int)padding_size);
+
+			memcpy((char*)&state->buf.record + already_written,
+			       state->cur_page + (state->curptr % XLOG_BLCKSZ),
+			       copysize);
+
+			XLogReaderInternalWrite(state,
+			                        state->cur_page + (state->curptr % XLOG_BLCKSZ),
+			                        len_in_block);
+#ifdef VERBOSE_DEBUG
+			elog(LOG, "copied part of the record. len_in_block %u, remaining: %u",
+			     len_in_block, state->remaining_size);
+#endif
+		}
+		/*
+		 * copy data into the current backup block header so we have enough
+		 * knowledge to read the actual backup block afterwards
+		 */
+		else if (state->in_bkp_block_header)
+		{
+			int blockno = XLR_MAX_BKP_BLOCKS - state->in_bkp_blocks;
+			BkpBlock* bkpb = &state->buf.bkp_block[blockno];
+
+			Assert(state->in_bkp_blocks);
+
+			memcpy((char*)bkpb + sizeof(BkpBlock) - state->remaining_size,
+			       state->cur_page + (state->curptr % XLOG_BLCKSZ),
+			       len_in_block);
+
+			XLogReaderInternalWrite(state,
+			                        state->cur_page + ((uint32)state->curptr % XLOG_BLCKSZ),
+			                        len_in_block);
+
+			COMP_CRC32(state->next_crc,
+			           state->cur_page + (state->curptr % XLOG_BLCKSZ),
+			           len_in_block);
+
+#ifdef VERBOSE_DEBUG
+			elog(LOG, "copying bkp header for block %d, %u bytes, complete %lu at %X/%X rem %u",
+			     blockno, len_in_block, sizeof(BkpBlock),
+			     (uint32)(state->curptr >> 32), (uint32)state->curptr,
+			     state->remaining_size);
+
+			if (state->remaining_size == len_in_block)
+			{
+				elog(LOG, "block off %u len %u", bkpb->hole_offset, bkpb->hole_length);
+			}
+#endif
+		}
+		/*
+		 * Reassemble the current backup block, those usually are the biggest
+		 * parts of individual XLogRecords so this might take several rounds.
+		 */
+		else if (state->in_bkp_blocks)
+		{
+			int blockno = XLR_MAX_BKP_BLOCKS - state->in_bkp_blocks;
+			BkpBlock* bkpb = &state->buf.bkp_block[blockno];
+			char* data = state->buf.bkp_block_data[blockno];
+
+			if (state->do_reassemble_record)
+			{
+				memcpy(data + BLCKSZ - bkpb->hole_length - state->remaining_size,
+				       state->cur_page + (state->curptr % XLOG_BLCKSZ),
+				       len_in_block);
+			}
+
+			XLogReaderInternalWrite(state,
+			                        state->cur_page + (state->curptr % XLOG_BLCKSZ),
+			                        len_in_block);
+
+			COMP_CRC32(state->next_crc,
+			           state->cur_page + (state->curptr % XLOG_BLCKSZ),
+			           len_in_block);
+
+#ifdef VERBOSE_DEBUG
+			elog(LOG, "copying %u bytes of data for bkp block %d, complete %u",
+			     len_in_block, blockno, state->remaining_size);
+#endif
+		}
+		/*
+		 * read the (rest) of the XLogRecord's data. Note that this is not the
+		 * XLogRecord struct itself!
+		 */
+		else if (state->in_record)
+		{
+			if (state->do_reassemble_record)
+			{
+				if(state->buf.record_data_size < state->buf.record.xl_len){
+					state->buf.record_data_size = state->buf.record.xl_len;
+					state->buf.record_data =
+						realloc(state->buf.record_data,
+						        state->buf.record_data_size);
+					if(!state->buf.record_data)
+						elog(ERROR, "could not allocate memory for contents of an xlog record");
+				}
+
+				memcpy(state->buf.record_data
+				       + state->buf.record.xl_len
+				       - state->remaining_size,
+				       state->cur_page + (state->curptr % XLOG_BLCKSZ),
+				       len_in_block);
+			}
+			XLogReaderInternalWrite(state,
+			                        state->cur_page + (state->curptr % XLOG_BLCKSZ),
+			                        len_in_block);
+
+
+			COMP_CRC32(state->next_crc,
+			           state->cur_page + (state->curptr % XLOG_BLCKSZ),
+			           len_in_block);
+
+#ifdef VERBOSE_DEBUG
+			elog(LOG, "copying %u bytes into a record at off %u",
+			     len_in_block, (uint32)(state->curptr % XLOG_BLCKSZ));
+#endif
+		}
+
+		/* should handle wrapping around to next page */
+		XLByteAdvance(state->curptr, len_in_block);
+
+		/* do the math of how much we need to read next round */
+		state->remaining_size -= len_in_block;
+
+		/*
+		 * --------------------------------------------------------------------
+		 * we completed whatever we were reading. So, handle going to the next
+		 * state.
+		 * --------------------------------------------------------------------
+		 */
+		if (state->remaining_size == 0)
+		{
+			/* completed reading - and potentially reassembling - the record */
+			if (state->in_record_header)
+			{
+				state->in_record_header = false;
+
+				/* ------------------------------------------------------------
+				 * normally we don't look at the content of xlog records here,
+				 * XLOG_SWITCH is a special case though, as everything left in
+				 * that segment won't be sensbible content.
+				 * So skip to the next segment.
+				 * ------------------------------------------------------------
+				 */
+				if (state->buf.record.xl_rmid == RM_XLOG_ID
+				    && (state->buf.record.xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
+				{
+					/*
+					 * Pretend the current data extends to end of segment
+					 */
+					elog(LOG, "XLOG_SWITCH");
+					state->curptr += XLogSegSize - 1;
+					state->curptr -= state->curptr % XLogSegSize;
+
+					state->in_record = false;
+					Assert(!state->in_bkp_blocks);
+					Assert(!state->in_skip);
+					continue;
+				}
+				else if (state->is_record_interesting == NULL ||
+				         state->is_record_interesting(state, &state->buf.record))
+				{
+					state->remaining_size = state->buf.record.xl_len;
+					Assert(state->in_bkp_blocks == 0);
+					Assert(!state->in_bkp_block_header);
+					Assert(!state->in_skip);
+#ifdef VERBOSE_DEBUG
+					elog(LOG, "found interesting record at %X/%X, prev: %X/%X, rmid %hhu, tx %u, len %u tot %u",
+					     (uint32)(state->buf.origptr >> 32), (uint32)state->buf.origptr,
+					     (uint32)(state->buf.record.xl_prev >> 32), (uint32)(state->buf.record.xl_prev),
+					     state->buf.record.xl_rmid, state->buf.record.xl_xid,
+					     state->buf.record.xl_len, state->buf.record.xl_tot_len);
+#endif
+
+				}
+				/* ------------------------------------------------------------
+				 * ok, everybody aggrees, the content of the current record are
+				 * just plain boring. So fake-up a record that replaces it with
+				 * a NOOP record.
+				 *
+				 * FIXME: we should allow "compressing" the output here. That
+				 * is write something that shows how long the record should be
+				 * if everything is decompressed again. This can radically
+				 * reduce space-usage over the wire.
+				 * It could also be very useful for traditional SR by removing
+				 * unneded BKP blocks from being transferred.  For that we
+				 * would need to recompute CRCs though, which we currently
+				 * don't support.
+				 * ------------------------------------------------------------
+				 */
+				else
+				{
+					/*
+					 * we need to fix up a fake record with correct length that
+					 * can be written out.
+					 */
+					XLogRecord spacer;
+
+					elog(LOG, "found boring record at %X/%X, rmid %hhu, tx %u, len %u tot %u",
+					     (uint32)(state->buf.origptr >> 32), (uint32)state->buf.origptr,
+					     state->buf.record.xl_rmid, state->buf.record.xl_xid,
+					     state->buf.record.xl_len, state->buf.record.xl_tot_len);
+
+					/*
+					 * xl_tot_len contains the size of the XLogRecord itself,
+					 * we read that already though.
+					 */
+					state->remaining_size = state->buf.record.xl_tot_len
+						- SizeOfXLogRecord;
+
+					state->in_record = true;
+					state->check_crc = true;
+					state->in_bkp_blocks = 0;
+					state->in_skip = true;
+
+					spacer.xl_prev = state->buf.origptr;
+					spacer.xl_xid = InvalidTransactionId;
+					spacer.xl_tot_len = state->buf.record.xl_tot_len;
+					spacer.xl_len = state->buf.record.xl_tot_len - SizeOfXLogRecord;
+					spacer.xl_rmid = RM_XLOG_ID;
+					spacer.xl_info = XLOG_NOOP;
+
+					XLogReaderInternalWrite(state, (char*)&spacer,
+					                        sizeof(XLogRecord));
+
+					/*
+					 * write out the padding in a separate write, otherwise we
+					 * would overrun the stack
+					 */
+					XLogReaderInternalWrite(state, NULL,
+					                        SizeOfXLogRecord - sizeof(XLogRecord));
+
+				}
+			}
+			/*
+			 * in the in_skip case we already read backup blocks because we
+			 * likely read record->xl_tot_len, so everything is finished.
+			 */
+			else if (state->in_skip)
+			{
+				state->in_record = false;
+				state->in_bkp_blocks = 0;
+				state->in_skip = false;
+				/* alignment is handled when starting to read a record */
+			}
+			/*
+			 * We read the header of the current block. Start reading the
+			 * content of that now.
+			 */
+			else if (state->in_bkp_block_header)
+			{
+				BkpBlock* bkpb;
+				int blockno = XLR_MAX_BKP_BLOCKS - state->in_bkp_blocks;
+
+				Assert(state->in_bkp_blocks);
+
+				bkpb = &state->buf.bkp_block[blockno];
+
+				if(bkpb->hole_length >= BLCKSZ)
+				{
+					elog(ERROR, "hole_length of block %u is %u but maximum is %u",
+					     blockno, bkpb->hole_length, BLCKSZ);
+				}
+
+				if(bkpb->hole_offset >= BLCKSZ)
+				{
+					elog(ERROR, "hole_offset of block %u is %u but maximum is %u",
+					     blockno, bkpb->hole_offset, BLCKSZ);
+				}
+
+				state->remaining_size = BLCKSZ - bkpb->hole_length;
+				state->in_bkp_block_header = false;
+
+#ifdef VERBOSE_DEBUG
+				elog(LOG, "completed reading of header for %d, reading data now %u hole %u, off %u",
+				     blockno, state->remaining_size, bkpb->hole_length,
+				     bkpb->hole_offset);
+#endif
+			}
+			/*
+			 * The current backup block is finished, more could be following
+			 */
+			else if (state->in_bkp_blocks)
+			{
+				int blockno = XLR_MAX_BKP_BLOCKS - state->in_bkp_blocks;
+				BkpBlock* bkpb;
+				char* bkpb_data;
+
+				Assert(!state->in_bkp_block_header);
+
+				bkpb = &state->buf.bkp_block[blockno];
+				bkpb_data = state->buf.bkp_block_data[blockno];
+
+				/*
+				 * reassemble block to its entirety by removing the bkp_hole
+				 * "compression"
+				 */
+				if(bkpb->hole_length){
+					memmove(bkpb_data + bkpb->hole_offset,
+					        bkpb_data + bkpb->hole_offset + bkpb->hole_length,
+					        BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
+					memset(bkpb_data + bkpb->hole_offset,
+					       0,
+					       bkpb->hole_length);
+				}
+
+				state->in_bkp_blocks--;
+
+				state->in_skip = false;
+
+				if(!XLogReaderInternalNextBkpBlock(state))
+					goto all_bkp_finished;
+
+			}
+			/*
+			 * read a non-skipped record, start reading bkp blocks afterwards
+			 */
+			else if (state->in_record)
+			{
+				Assert(!state->in_skip);
+
+				state->in_bkp_blocks = XLR_MAX_BKP_BLOCKS;
+
+				if(!XLogReaderInternalNextBkpBlock(state))
+					goto all_bkp_finished;
+			}
+		}
+		/*
+		 * Something could only be partially read inside a single block because
+		 * of input or output space constraints..
+		 */
+		else if (partial_read)
+		{
+			partial_read = false;
+			goto not_enough_input;
+		}
+		else if (partial_write)
+		{
+			partial_write = false;
+			goto not_enough_output;
+		}
+		/*
+		 * Data continues into the next block.
+		 */
+		else
+		{
+		}
+
+#ifdef VERBOSE_DEBUG
+		elog(LOG, "one loop end: record: %u header: %u, skip: %u bkb_block: %d in_bkp_header: %u curpos: %X/%X remaining: %u, off: %u",
+		     state->in_record, state->in_record_header, state->in_skip,
+		     state->in_bkp_blocks, state->in_bkp_block_header,
+		     (uint32)(state->curptr >> 32), (uint32)state->curptr,
+		     state->remaining_size,
+		     (uint32)(state->curptr % XLOG_BLCKSZ));
+#endif
+		continue;
+
+		/*
+		 * we fully read a record. Process its contents if needed and start
+		 * reading the next record afterwards
+		 */
+	all_bkp_finished:
+		{
+			Assert(state->in_record);
+			Assert(!state->in_skip);
+			Assert(!state->in_bkp_block_header);
+			Assert(!state->in_bkp_blocks);
+
+			state->in_record = false;
+
+			/* compute and verify crc */
+			COMP_CRC32(state->next_crc,
+			           &state->buf.record,
+			           offsetof(XLogRecord, xl_crc));
+
+			FIN_CRC32(state->next_crc);
+
+			if (state->check_crc &&
+			    state->next_crc != state->buf.record.xl_crc) {
+				elog(ERROR, "crc mismatch: newly computed : %x, existing is %x",
+				     state->next_crc, state->buf.record.xl_crc);
+			}
+
+			/*
+			 * if we haven't reassembled the record there is no point in
+			 * calling the finished callback because we do not have any
+			 * interesting data. do_reassemble_record is false if we don't have
+			 * a finished_record callback.
+			 */
+			if (state->do_reassemble_record)
+			{
+				/* in stop_at_record_boundary thats a valid case */
+				if (state->finished_record)
+				{
+					state->finished_record(state, &state->buf);
+				}
+
+				if (state->stop_at_record_boundary)
+					goto out;
+			}
+
+			/* alignment is handled when starting to read a record */
+#ifdef VERBOSE_DEBUG
+			elog(LOG, "finished record at %X/%X to %X/%X, already_written_size: %lu, reas = %d",
+			     (uint32)(state->curptr >> 32), (uint32)state->curptr,
+			     (uint32)(state->endptr >> 32), (uint32)state->endptr,
+			     state->already_written_size, state->do_reassemble_record);
+#endif
+
+		}
+	}
+out:
+	/*
+	 * we are finished, check whether we finished everything, this may be
+	 * useful for the caller.
+	 */
+	if (state->in_skip)
+	{
+		state->incomplete = true;
+	}
+	else if (state->in_record)
+	{
+		state->incomplete = true;
+	}
+	else
+	{
+		state->incomplete = false;
+	}
+	return;
+
+not_enough_input:
+	/* signal we need more xlog and finish */
+	state->needs_input = true;
+	goto out;
+
+not_enough_output:
+	/* signal we need more space to write output to */
+	state->needs_output = true;
+	goto out;
+}
+
+XLogRecordBuffer*
+XLogReaderReadOne(XLogReaderState* state)
+{
+	bool was_set_to_stop = state->stop_at_record_boundary;
+	XLogRecPtr last_record = state->buf.origptr;
+
+	if (!was_set_to_stop)
+		state->stop_at_record_boundary = true;
+
+	XLogReaderRead(state);
+
+	if (!was_set_to_stop)
+		state->stop_at_record_boundary = false;
+
+	/* check that we fully read it and that its not the same as the last one */
+	if (state->incomplete ||
+	    XLByteEQ(last_record, state->buf.origptr))
+		return NULL;
+
+	return &state->buf;
+}
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
new file mode 100644
index 0000000..f45c90b
--- /dev/null
+++ b/src/include/access/xlogreader.h
@@ -0,0 +1,264 @@
+/*-------------------------------------------------------------------------
+ *
+ * readxlog.h
+ *
+ *		Generic xlog reading facility.
+ *
+ * Portions Copyright (c) 2012, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		src/include/access/readxlog.h
+ *
+ * NOTES
+ *		Check the definition of the XLogReaderState struct for instructions on
+ *		how to use the XLogReader infrastructure.
+ *
+ *		The basic idea is to allocate an XLogReaderState via
+ *		XLogReaderAllocate, fill out the wanted callbacks, set startptr/endptr
+ *		and call XLogReaderRead(state). That will iterate over the record as
+ *		long as it has enough input to reassemble a record calling
+ *		is_interesting/finish_record for every record found.
+ *-------------------------------------------------------------------------
+ */
+#ifndef READXLOG_H
+#define READXLOG_H
+
+#include "access/xlog_internal.h"
+
+/*
+ * Used to store a reassembled record.
+ */
+typedef struct XLogRecordBuffer
+{
+	/* the record itself */
+	XLogRecord record;
+
+	/* at which LSN was that record found at */
+	XLogRecPtr origptr;
+
+	/* the data for xlog record */
+	char* record_data;
+	uint32 record_data_size;
+
+	BkpBlock bkp_block[XLR_MAX_BKP_BLOCKS];
+	char* bkp_block_data[XLR_MAX_BKP_BLOCKS];
+} XLogRecordBuffer;
+
+
+struct XLogReaderState;
+
+/*
+ * The callbacks are explained in more detail inside the XLogReaderState
+ * struct.
+ */
+typedef bool (*XLogReaderStateInterestingCB)(struct XLogReaderState* state,
+                                             XLogRecord* r);
+typedef void (*XLogReaderStateWriteoutCB)(struct XLogReaderState* state,
+                                          char* data, Size length);
+typedef void (*XLogReaderStateFinishedRecordCB)(struct XLogReaderState* state,
+                                                XLogRecordBuffer* buf);
+typedef void (*XLogReaderStateReadPageCB)(struct XLogReaderState* state,
+                                          char* cur_page, XLogRecPtr at);
+
+typedef struct XLogReaderState
+{
+	/* ----------------------------------------
+	 * Public parameters
+	 * ----------------------------------------
+	 */
+
+	/* callbacks */
+
+	/*
+	 * Called to decide whether a xlog record is interesting and should be
+	 * assembled, analyzed (finished_record) and written out or skipped.
+	 *
+	 * Gets passed the current state as the first parameter and and the record
+	 * *header* to decide over as the second.
+	 *
+	 * Return false to skip the record - and output a NOOP record instead - and
+	 * true to reassemble it fully.
+	 *
+	 * If set to NULL every record is considered to be interesting.
+	 */
+	XLogReaderStateInterestingCB is_record_interesting;
+
+	/*
+	 * Writeout xlog data.
+	 *
+	 * The 'state' parameter is passed as the first parameter and a pointer to
+	 * the 'data' and its 'length' as second and third paramter. If the 'data'
+	 * is NULL zeroes need to be written out.
+	 */
+	XLogReaderStateWriteoutCB writeout_data;
+
+	/*
+	 * If set to anything but NULL this callback gets called after a record,
+	 * including the backup blocks, has been fully reassembled.
+	 *
+	 * The first parameter is the current 'state'. 'buf', an XLogRecordBuffer,
+	 * gets passed as the second parameter and contains the record header, its
+	 * data, original position/lsn and backup block.
+	 */
+	XLogReaderStateFinishedRecordCB finished_record;
+
+	/*
+	 * Data input function.
+	 *
+	 * This callback *has* to be implemented.
+	 *
+	 * Has to read XLOG_BLKSZ bytes that are at the location 'at' into the
+	 * memory pointed at by cur_page although everything behind endptr does not
+	 * have to be valid.
+	 */
+	XLogReaderStateReadPageCB read_page;
+
+	/*
+	 * this can be used by the caller to pass state to the callbacks without
+	 * using global variables or such ugliness. It will neither be read or set
+	 * by anything but your code.
+	 */
+	void* private_data;
+
+
+	/* from where to where are we reading */
+
+	/* so we know where interesting data starts after scrolling back to the beginning of a page */
+	XLogRecPtr startptr;
+
+	/* continue up to here in this run */
+	XLogRecPtr endptr;
+
+	/*
+	 * size of the output buffer, if set to zero (default), there is no limit
+	 * in the output buffer size.
+	 */
+	Size output_buffer_size;
+
+	/*
+	 * Stop reading and return after every completed record.
+	 */
+	bool stop_at_record_boundary;
+
+	/* ----------------------------------------
+	 * output parameters
+	 * ----------------------------------------
+	 */
+
+	/* we need new input data - a later endptr - to continue reading */
+	bool needs_input;
+
+	/* we need new output space to continue reading */
+	bool needs_output;
+
+	/* track our progress */
+	XLogRecPtr curptr;
+
+	/*
+	 * are we in the middle of something? This is useful for the outside to
+	 * know whether to start reading anew
+	 */
+	bool incomplete;
+
+	/* ----------------------------------------
+	 * private/internal state
+	 * ----------------------------------------
+	 */
+
+	char cur_page[XLOG_BLCKSZ];
+	XLogPageHeader page_header;
+	uint32 page_header_size;
+	XLogRecordBuffer buf;
+	pg_crc32 next_crc;
+
+	/* ----------------------------------------
+	 * state machine variables
+	 * ----------------------------------------
+	 */
+
+	bool initialized;
+
+	/* are we currently reading a record? */
+	bool in_record;
+
+	/* are we currently reading a record header? */
+	bool in_record_header;
+
+	/* do we want to reassemble the record or just read/write it? */
+	bool do_reassemble_record;
+
+	/* how many bkp blocks remain to be read? */
+	int in_bkp_blocks;
+
+	/*
+	 * the header of a bkp block can be split across pages, so we need to
+	 * support reading that incrementally
+	 */
+	bool in_bkp_block_header;
+
+	/*
+	 * We are not interested in the contents of the `remaining_size` next
+	 * blocks. Don't read their contents and write out zeroes instead.
+	 */
+	bool in_skip;
+
+	/*
+	 * Should we check the crc of the currently read record? In some situations
+	 * - e.g. if we just skip till the start of a record - this doesn't make
+	 * sense.
+	 *
+	 * This needs to be separate from in_skip because we want to be able not
+	 * writeout records but still verify those. E.g. records that are "not
+	 * interesting".
+	 */
+	bool check_crc;
+
+	/* how much more to read in the current state */
+	uint32 remaining_size;
+
+	/* size of already written data */
+	Size already_written_size;
+
+} XLogReaderState;
+
+/*
+ * Get a new XLogReader
+ *
+ * At least the read_page callback, startptr and endptr have to be set before
+ * the reader can be used.
+ */
+extern XLogReaderState* XLogReaderAllocate(void);
+
+/*
+ * Free an XLogReader
+ */
+extern void XLogReaderFree(XLogReaderState*);
+
+/*
+ * Reset internal state so it can be used without continuing from the last
+ * state.
+ *
+ * The callbacks and private_data won't be reset
+ */
+extern void XLogReaderReset(XLogReaderState* state);
+
+/*
+ * Read the xlog and call the appropriate callbacks as far as possible within
+ * the constraints of input data (startptr, endptr) and output space.
+ */
+extern void XLogReaderRead(XLogReaderState* state);
+
+/*
+ * Read the next xlog record if enough input/output is available.
+ *
+ * This is a bit less efficient than XLogReaderRead.
+ *
+ * Returns NULL if the next record couldn't be read for some reason. Check
+ * state->incomplete, ->needs_input, ->needs_output.
+ *
+ * Be careful to check that there is anything further to read when using
+ * ->endptr, otherwise its easy to get in an endless loop.
+ */
+extern XLogRecordBuffer* XLogReaderReadOne(XLogReaderState* state);
+
+#endif /* READXLOG_H */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to