Petr Jelinek wrote:
> On 04/03/16 17:08, Craig Ringer wrote:
> >I'd really appreciate some review of the logic there by people who know
> >timelines well and preferably know the xlogreader. It's really just one
> >function and 2/3 comments; the code is simple but the reasoning leading
> >to it is not.
> 
> I think this will have to be work for committer at this point. I can't find
> any flaws in the logic myself so I unless somebody protests I am going to
> mark this as ready for committer.

Great, thanks.  I've studied this to the point where I'm confident that
it makes sense, so I'm about to push it.  I didn't change any logic,
only updated comments here and there, both in the patch and in some
preexisting code.  I also changed the "List *timelineHistory" to be
#ifndef FRONTEND, which seems cleaner than having it and insist that it
couldn't be used.

Also, hopefully you don't have any future callers for the functions I
marked static (I checked the failover slots series and couldn't see
anything).  I will push this early tomorrow.

One thing I'm not quite sure about is why this is said to apply to
"logical slots" and not just to replication slots in general.  In what
sense does it not apply to physical replication slots?

(I noticed that we have this new line,
-               randAccess = true;
+               randAccess = true;      /* allow readPageTLI to go backwards */ 
in which now the comment is an identical copy of an identical line
elsewhere; however, randAccess doesn't actually affect readPageTLI in
any way, so AFAICS both comments are now wrong.)

> Well for testing purposes it's quite fine I think. The TAP framework
> enhancements needed for this are now in and it works correctly against
> current master.

Certainly the new src/test/recovery/t/006whatever.pl file is going to be
part of the commit.  What I'm not so sure about is
src/test/modules/decoding_failover: is there any reason we don't just
put the new functions in contrib/test_decoding?

-- 
Álvaro Herrera                http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index fcb0872..7b60b8f 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -10,9 +10,11 @@
  *
  * NOTES
  *		See xlogreader.h for more notes on this facility.
+ *
+ *		This file is compiled as both front-end and backend code, so it
+ *		may not use ereport, server-defined static variables, etc.
  *-------------------------------------------------------------------------
  */
-
 #include "postgres.h"
 
 #include "access/transam.h"
@@ -116,6 +118,11 @@ XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data)
 		return NULL;
 	}
 
+#ifndef FRONTEND
+	/* Will be loaded on first read */
+	state->timelineHistory = NIL;
+#endif
+
 	return state;
 }
 
@@ -135,6 +142,10 @@ XLogReaderFree(XLogReaderState *state)
 	pfree(state->errormsg_buf);
 	if (state->readRecordBuf)
 		pfree(state->readRecordBuf);
+#ifndef FRONTEND
+	if (state->timelineHistory)
+		list_free_deep(state->timelineHistory);
+#endif
 	pfree(state->readBuf);
 	pfree(state);
 }
@@ -208,10 +219,11 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 
 	if (RecPtr == InvalidXLogRecPtr)
 	{
+		/* No explicit start point; read the record after the one we just read */
 		RecPtr = state->EndRecPtr;
 
 		if (state->ReadRecPtr == InvalidXLogRecPtr)
-			randAccess = true;
+			randAccess = true;	/* allow readPageTLI to go backwards */
 
 		/*
 		 * RecPtr is pointing to end+1 of the previous WAL record.  If we're
@@ -223,6 +235,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 	else
 	{
 		/*
+		 * Caller supplied a position to start at.
+		 *
 		 * In this case, the passed-in record pointer should already be
 		 * pointing to a valid record starting position.
 		 */
@@ -309,8 +323,10 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 		/* XXX: more validation should be done here */
 		if (total_len < SizeOfXLogRecord)
 		{
-			report_invalid_record(state, "invalid record length at %X/%X",
-								  (uint32) (RecPtr >> 32), (uint32) RecPtr);
+			report_invalid_record(state,
+						"invalid record length at %X/%X: wanted %lu, got %u",
+								  (uint32) (RecPtr >> 32), (uint32) RecPtr,
+								  SizeOfXLogRecord, total_len);
 			goto err;
 		}
 		gotheader = false;
@@ -466,9 +482,7 @@ err:
 	 * Invalidate the xlog page we've cached. We might read from a different
 	 * source after failure.
 	 */
-	state->readSegNo = 0;
-	state->readOff = 0;
-	state->readLen = 0;
+	XLogReaderInvalCache(state);
 
 	if (state->errormsg_buf[0] != '\0')
 		*errormsg = state->errormsg_buf;
@@ -580,10 +594,19 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 	return readLen;
 
 err:
+	XLogReaderInvalCache(state);
+	return -1;
+}
+
+/*
+ * Invalidate the xlogreader's cached page to force a re-read.
+ */
+void
+XLogReaderInvalCache(XLogReaderState *state)
+{
 	state->readSegNo = 0;
 	state->readOff = 0;
 	state->readLen = 0;
-	return -1;
 }
 
 /*
@@ -600,8 +623,9 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
 	if (record->xl_tot_len < SizeOfXLogRecord)
 	{
 		report_invalid_record(state,
-							  "invalid record length at %X/%X",
-							  (uint32) (RecPtr >> 32), (uint32) RecPtr);
+						"invalid record length at %X/%X: wanted %lu, got %u",
+							  (uint32) (RecPtr >> 32), (uint32) RecPtr,
+							  SizeOfXLogRecord, record->xl_tot_len);
 		return false;
 	}
 	if (record->xl_rmid > RM_MAX_ID)
@@ -907,11 +931,9 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
 err:
 out:
 	/* Reset state to what we had before finding the record */
-	state->readSegNo = 0;
-	state->readOff = 0;
-	state->readLen = 0;
 	state->ReadRecPtr = saved_state.ReadRecPtr;
 	state->EndRecPtr = saved_state.EndRecPtr;
+	XLogReaderInvalCache(state);
 
 	return found;
 }
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 444e218..67318d4 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -19,12 +19,12 @@
 
 #include <unistd.h>
 
-#include "miscadmin.h"
-
+#include "access/timeline.h"
 #include "access/xlog.h"
 #include "access/xlog_internal.h"
 #include "access/xlogutils.h"
 #include "catalog/catalog.h"
+#include "miscadmin.h"
 #include "storage/smgr.h"
 #include "utils/guc.h"
 #include "utils/hsearch.h"
@@ -638,8 +638,17 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
 }
 
 /*
- * TODO: This is duplicate code with pg_xlogdump, similar to walsender.c, but
- * we currently don't have the infrastructure (elog!) to share it.
+ * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
+ * in timeline 'tli'.
+ *
+ * Will open, and keep open, one WAL segment stored in the static file
+ * descriptor 'sendFile'. This means if XLogRead is used once, there will
+ * always be one descriptor left open until the process ends, but never
+ * more than one.
+ *
+ * XXX This is very similar to pg_xlogdump's XLogDumpXLogRead and to XLogRead
+ * in walsender.c but for small differences (such as lack of elog() in
+ * frontend).  Probably these should be merged at some point.
  */
 static void
 XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
@@ -648,8 +657,12 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
 	XLogRecPtr	recptr;
 	Size		nbytes;
 
+	/*
+	 * Cached state across calls.
+	 */
 	static int	sendFile = -1;
 	static XLogSegNo sendSegNo = 0;
+	static TimeLineID sendTLI = 0;
 	static uint32 sendOff = 0;
 
 	p = buf;
@@ -664,11 +677,12 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
 
 		startoff = recptr % XLogSegSize;
 
-		if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
+		/* Do we need to open a new xlog segment? */
+		if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo) ||
+			sendTLI != tli)
 		{
 			char		path[MAXPGPATH];
 
-			/* Switch to another logfile segment */
 			if (sendFile >= 0)
 				close(sendFile);
 
@@ -692,22 +706,17 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
 									path)));
 			}
 			sendOff = 0;
+			sendTLI = tli;
 		}
 
 		/* Need to seek in the file? */
 		if (sendOff != startoff)
 		{
 			if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
-			{
-				char		path[MAXPGPATH];
-
-				XLogFilePath(path, tli, sendSegNo);
-
 				ereport(ERROR,
 						(errcode_for_file_access(),
 				  errmsg("could not seek in log segment %s to offset %u: %m",
-						 path, startoff)));
-			}
+						 XLogFileNameP(tli, sendSegNo), startoff)));
 			sendOff = startoff;
 		}
 
@@ -719,16 +728,11 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
 
 		readbytes = read(sendFile, p, segbytes);
 		if (readbytes <= 0)
-		{
-			char		path[MAXPGPATH];
-
-			XLogFilePath(path, tli, sendSegNo);
-
 			ereport(ERROR,
 					(errcode_for_file_access(),
 					 errmsg("could not read from log segment %s, offset %u, length %lu: %m",
-							path, sendOff, (unsigned long) segbytes)));
-		}
+							XLogFileNameP(tli, sendSegNo), sendOff,
+							(unsigned long) segbytes)));
 
 		/* Update state for read */
 		recptr += readbytes;
@@ -740,12 +744,151 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
 }
 
 /*
- * read_page callback for reading local xlog files
+ * Determine XLogReaderState->currTLI and ->currTLIValidUntil;
+ * XLogReadState->EndRecPtr, ->currRecPtr and ThisTimeLineID affect the
+ * decision.  This may later be used to determine which xlog segment file to
+ * open, etc.
+ *
+ * We switch to an xlog segment from the new timeline eagerly when on a
+ * historical timeline, as soon as we reach the start of the xlog segment
+ * containing the timeline switch.  The server copied the segment to the new
+ * timeline so all the data up to the switch point is the same, but there's no
+ * guarantee the old segment will still exist. It may have been deleted or
+ * renamed with a .partial suffix so we can't necessarily keep reading from
+ * the old TLI even though tliSwitchPoint says it's OK.
+ *
+ * Because of this, callers MAY NOT assume that currTLI is the timeline that
+ * will be in a page's xlp_tli; the page may begin on an older timeline or we
+ * might be reading from historical timeline data on a segment that's been
+ * copied to a new timeline.
+ */
+static void
+XLogReadDetermineTimeline(XLogReaderState *state)
+{
+	/* Read the history on first time through */
+	if (state->timelineHistory == NIL)
+		state->timelineHistory = readTimeLineHistory(ThisTimeLineID);
+
+	/*
+	 * Are we reading the record immediately following the one we read last
+	 * time?  If not, then don't use the cached timeline info.
+	 */
+	if (state->currRecPtr != state->EndRecPtr)
+	{
+		state->currTLI = 0;
+		state->currTLIValidUntil = InvalidXLogRecPtr;
+	}
+
+	/*
+	 * Are we reading a timeline that used to be the latest one, but became
+	 * historical?	This can happen in a replica that gets promoted, and in a
+	 * cascading replica whose upstream gets promoted.  In either case,
+	 * re-read the timeline history data.  We cannot read past the timeline
+	 * switch point, because either the records in the old timeline might be
+	 * invalid, or worse, they may valid but *different* from the ones we
+	 * should be reading.
+	 */
+	if (state->currTLIValidUntil == InvalidXLogRecPtr &&
+		state->currTLI != ThisTimeLineID &&
+		state->currTLI != 0)
+	{
+		/* re-read timeline history */
+		list_free_deep(state->timelineHistory);
+		state->timelineHistory = readTimeLineHistory(ThisTimeLineID);
+
+		elog(DEBUG2, "timeline %u became historical during decoding",
+			 state->currTLI);
+
+		/* then invalidate the cached timeline info */
+		state->currTLI = 0;
+		state->currTLIValidUntil = InvalidXLogRecPtr;
+	}
+
+	/*
+	 * Are we reading a record immediately following a timeline switch?  If
+	 * so, we must follow the switch too.
+	 */
+	if (state->currRecPtr == state->EndRecPtr &&
+		state->currTLI != 0 &&
+		state->currTLIValidUntil != InvalidXLogRecPtr &&
+		state->currRecPtr >= state->currTLIValidUntil)
+	{
+		elog(DEBUG2,
+			 "requested record %X/%X is on segment containing end of TLI %u valid until %X/%X, switching to next timeline",
+			 (uint32) (state->currRecPtr >> 32),
+			 (uint32) state->currRecPtr,
+			 state->currTLI,
+			 (uint32) (state->currTLIValidUntil >> 32),
+			 (uint32) (state->currTLIValidUntil));
+
+		/* invalidate TLI info so we look up the next TLI */
+		state->currTLI = 0;
+		state->currTLIValidUntil = InvalidXLogRecPtr;
+	}
+
+	if (state->currTLI == 0)
+	{
+		/*
+		 * Something changed; work out what timeline this record is on. We
+		 * might read it from the segment on this TLI or, if the segment is
+		 * also contained by newer timelines, the copy from a newer TLI.
+		 */
+		state->currTLI = tliOfPointInHistory(state->currRecPtr,
+											 state->timelineHistory);
+
+		/*
+		 * Look for the most recent timeline that's on the same xlog segment
+		 * as this record, since that's the only one we can assume is still
+		 * readable.
+		 */
+		while (state->currTLI != ThisTimeLineID &&
+			   state->currTLIValidUntil == InvalidXLogRecPtr)
+		{
+			XLogRecPtr	tliSwitch;
+			TimeLineID	nextTLI;
+
+			tliSwitch = tliSwitchPoint(state->currTLI, state->timelineHistory,
+									   &nextTLI);
+
+			/* round ValidUntil down to start of seg containing the switch */
+			state->currTLIValidUntil =
+				((tliSwitch / XLogSegSize) * XLogSegSize);
+
+			if (state->currRecPtr >= state->currTLIValidUntil)
+			{
+				/*
+				 * The new currTLI ends on this WAL segment so check the next
+				 * TLI to see if it's the last one on the segment.
+				 *
+				 * If that's the current TLI we'll stop searching.
+				 */
+				state->currTLI = nextTLI;
+				state->currTLIValidUntil = InvalidXLogRecPtr;
+			}
+		}
+
+		/*
+		 * We're now either reading from the first xlog segment in the current
+		 * server's timeline or the most recent historical timeline that
+		 * exists on the target segment.
+		 */
+		elog(DEBUG2, "XLog read ptr %X/%X is on segment with TLI %u valid until %X/%X, server current TLI is %u",
+			 (uint32) (state->currRecPtr >> 32),
+			 (uint32) state->currRecPtr,
+			 state->currTLI,
+			 (uint32) (state->currTLIValidUntil >> 32),
+			 (uint32) (state->currTLIValidUntil),
+			 ThisTimeLineID);
+	}
+}
+
+/*
+ * XLogPageReadCB callback for reading local xlog files
  *
  * Public because it would likely be very helpful for someone writing another
  * output method outside walsender, e.g. in a bgworker.
  *
- * TODO: The walsender has it's own version of this, but it relies on the
+ * TODO: The walsender has its own version of this, but it relies on the
  * walsender's latch being set whenever WAL is flushed. No such infrastructure
  * exists for normal backends, so we have to do a check/sleep/repeat style of
  * loop for now.
@@ -754,46 +897,88 @@ int
 read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
 {
-	XLogRecPtr	flushptr,
+	XLogRecPtr	read_upto,
 				loc;
 	int			count;
 
 	loc = targetPagePtr + reqLen;
+
+	/* Make sure enough xlog is available... */
 	while (1)
 	{
 		/*
-		 * TODO: we're going to have to do something more intelligent about
-		 * timelines on standbys. Use readTimeLineHistory() and
-		 * tliOfPointInHistory() to get the proper LSN? For now we'll catch
-		 * that case earlier, but the code and TODO is left in here for when
-		 * that changes.
+		 * Check which timeline to get the record from.
+		 *
+		 * We have to do it each time through the loop because if we're in
+		 * recovery as a cascading standby, the current timeline might've
+		 * become historical.
 		 */
-		if (!RecoveryInProgress())
+		XLogReadDetermineTimeline(state);
+
+		if (state->currTLI == ThisTimeLineID)
 		{
-			*pageTLI = ThisTimeLineID;
-			flushptr = GetFlushRecPtr();
+			/*
+			 * We're reading from the current timeline so we might have to
+			 * wait for the desired record to be generated (or, for a standby,
+			 * received & replayed)
+			 */
+			if (!RecoveryInProgress())
+			{
+				*pageTLI = ThisTimeLineID;
+				read_upto = GetFlushRecPtr();
+			}
+			else
+				read_upto = GetXLogReplayRecPtr(pageTLI);
+
+			if (loc <= read_upto)
+				break;
+
+			CHECK_FOR_INTERRUPTS();
+			pg_usleep(1000L);
 		}
 		else
-			flushptr = GetXLogReplayRecPtr(pageTLI);
+		{
+			/*
+			 * We're on a historical timeline, so limit reading to the switch
+			 * point where we moved to the next timeline.
+			 */
+			read_upto = state->currTLIValidUntil;
 
-		if (loc <= flushptr)
+			/*
+			 * Setting pageTLI to our wanted record's TLI is slightly wrong;
+			 * the page might begin on an older timeline if it contains a
+			 * timeline switch, since its xlog segment will have been copied
+			 * from the prior timeline. This is pretty harmless though, as
+			 * nothing cares so long as the timeline doesn't go backwards.  We
+			 * should read the page header instead; FIXME someday.
+			 */
+			*pageTLI = state->currTLI;
+
+			/* No need to wait on a historical timeline */
 			break;
-
-		CHECK_FOR_INTERRUPTS();
-		pg_usleep(1000L);
+		}
 	}
 
-	/* more than one block available */
-	if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
+	if (targetPagePtr + XLOG_BLCKSZ <= read_upto)
+	{
+		/*
+		 * more than one block available; read only that block, have caller
+		 * come back if they need more.
+		 */
 		count = XLOG_BLCKSZ;
-	/* not enough data there */
-	else if (targetPagePtr + reqLen > flushptr)
+	}
+	else if (targetPagePtr + reqLen > read_upto)
+	{
+		/* not enough data there */
 		return -1;
-	/* part of the page available */
+	}
 	else
-		count = flushptr - targetPagePtr;
+	{
+		/* enough bytes available to satisfy the request */
+		count = read_upto - targetPagePtr;
+	}
 
-	XLogRead(cur_page, *pageTLI, targetPagePtr, XLOG_BLCKSZ);
+	XLogRead(cur_page, *pageTLI, targetPagePtr, count);
 
 	return count;
 }
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index f789fc1..dde130a 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -115,7 +115,7 @@ logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
 {
 	return read_local_xlog_page(state, targetPagePtr, reqLen,
-						 targetRecPtr, cur_page, pageTLI);
+								targetRecPtr, cur_page, pageTLI);
 }
 
 /*
@@ -231,12 +231,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 	rsinfo->setResult = p->tupstore;
 	rsinfo->setDesc = p->tupdesc;
 
-	/* compute the current end-of-wal */
-	if (!RecoveryInProgress())
-		end_of_wal = GetFlushRecPtr();
-	else
-		end_of_wal = GetXLogReplayRecPtr(NULL);
-
 	ReplicationSlotAcquire(NameStr(*name));
 
 	PG_TRY();
@@ -263,6 +257,15 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 
 		ctx->output_writer_private = p;
 
+		/*
+		 * We start reading xlog from the restart lsn, even though in
+		 * CreateDecodingContext we set the snapshot builder up using the
+		 * slot's candidate_restart_lsn. This means we might read xlog we
+		 * don't actually decode rows from, but the snapshot builder might
+		 * need it to get to a consistent point. The point we start returning
+		 * data to *users* at is the candidate restart lsn from the decoding
+		 * context.
+		 */
 		startptr = MyReplicationSlot->data.restart_lsn;
 
 		CurrentResourceOwner = ResourceOwnerCreate(CurrentResourceOwner, "logical decoding");
@@ -270,8 +273,14 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 		/* invalidate non-timetravel entries */
 		InvalidateSystemCaches();
 
+		if (!RecoveryInProgress())
+			end_of_wal = GetFlushRecPtr();
+		else
+			end_of_wal = GetXLogReplayRecPtr(NULL);
+
+		/* Decode until we run out of records */
 		while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) ||
-			 (ctx->reader->EndRecPtr && ctx->reader->EndRecPtr < end_of_wal))
+			   (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal))
 		{
 			XLogRecord *record;
 			char	   *errm = NULL;
@@ -280,6 +289,10 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 			if (errm)
 				elog(ERROR, "%s", errm);
 
+			/*
+			 * Now that we've set up the xlog reader state subsequent calls
+			 * pass InvalidXLogRecPtr to say "continue from last record"
+			 */
 			startptr = InvalidXLogRecPtr;
 
 			/*
@@ -299,6 +312,18 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 			CHECK_FOR_INTERRUPTS();
 		}
 
+		/* Make sure timeline lookups use the start of the next record */
+		startptr = ctx->reader->EndRecPtr;
+
+		/*
+		 * The XLogReader will read a page past the valid end of WAL because
+		 * it doesn't know about timelines. When we switch timelines and ask
+		 * it for the first page on the new timeline it will think it has it
+		 * cached, but it'll have the old partial page and say it can't find
+		 * the next record. So flush the cache.
+		 */
+		XLogReaderInvalCache(ctx->reader);
+
 		tuplestore_donestoring(tupstore);
 
 		CurrentResourceOwner = old_resowner;
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 7553cc4..31cafd3 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -27,6 +27,10 @@
 
 #include "access/xlogrecord.h"
 
+#ifndef FRONTEND
+#include "nodes/pg_list.h"
+#endif
+
 typedef struct XLogReaderState XLogReaderState;
 
 /* Function type definition for the read_page callback */
@@ -139,26 +143,50 @@ struct XLogReaderState
 	 * ----------------------------------------
 	 */
 
-	/* Buffer for currently read page (XLOG_BLCKSZ bytes) */
+	/*
+	 * Buffer for currently read page (XLOG_BLCKSZ bytes, valid up to at least
+	 * readLen bytes)
+	 */
 	char	   *readBuf;
 
-	/* last read segment, segment offset, read length, TLI */
+	/*
+	 * last read segment, segment offset, read length, TLI for data currently
+	 * in readBuf.
+	 */
 	XLogSegNo	readSegNo;
 	uint32		readOff;
 	uint32		readLen;
 	TimeLineID	readPageTLI;
 
-	/* beginning of last page read, and its TLI  */
+	/*
+	 * beginning of prior page read, and its TLI. Doesn't necessarily
+	 * correspond to what's in readBuf, used for timeline sanity checks.
+	 */
 	XLogRecPtr	latestPagePtr;
 	TimeLineID	latestPageTLI;
 
 	/* beginning of the WAL record being read. */
 	XLogRecPtr	currRecPtr;
+	/* timeline to read it from, 0 if a lookup is required */
+	TimeLineID	currTLI;
+
+	/*
+	 * Pointer to the end of the last whole segment on the timeline in currTLI
+	 * if it's historical or InvalidXLogRecPtr if currTLI is the current
+	 * timeline. This is *not* the tliSwitchPoint but it's guaranteed safe to
+	 * read up to this point from currTLI.
+	 */
+	XLogRecPtr	currTLIValidUntil;
 
 	/* Buffer for current ReadRecord result (expandable) */
 	char	   *readRecordBuf;
 	uint32		readRecordBufSize;
 
+#ifndef FRONTEND
+	/* cached timeline history, only available in backend */
+	List	   *timelineHistory;
+#endif
+
 	/* Buffer to hold error message */
 	char	   *errormsg_buf;
 };
@@ -174,6 +202,9 @@ extern void XLogReaderFree(XLogReaderState *state);
 extern struct XLogRecord *XLogReadRecord(XLogReaderState *state,
 			   XLogRecPtr recptr, char **errormsg);
 
+/* Flush any cached page */
+extern void XLogReaderInvalCache(XLogReaderState *state);
+
 #ifdef FRONTEND
 extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr);
 #endif   /* FRONTEND */
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 1b9abce..c9df35e 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -48,6 +48,6 @@ extern Relation CreateFakeRelcacheEntry(RelFileNode rnode);
 extern void FreeFakeRelcacheEntry(Relation fakerel);
 
 extern int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
-	int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI);
+   int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI);
 
 #endif
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 6167ec1..bbaf94f 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -7,6 +7,7 @@ include $(top_builddir)/src/Makefile.global
 SUBDIRS = \
 		  brin \
 		  commit_ts \
+		  decoding_failover \
 		  dummy_seclabel \
 		  test_ddl_deparse \
 		  test_extensions \
diff --git a/src/test/modules/decoding_failover/.gitignore b/src/test/modules/decoding_failover/.gitignore
new file mode 100644
index 0000000..543c50d
--- /dev/null
+++ b/src/test/modules/decoding_failover/.gitignore
@@ -0,0 +1,3 @@
+results/
+tmp_check/
+log/
diff --git a/src/test/modules/decoding_failover/Makefile b/src/test/modules/decoding_failover/Makefile
new file mode 100644
index 0000000..6d1c034
--- /dev/null
+++ b/src/test/modules/decoding_failover/Makefile
@@ -0,0 +1,22 @@
+# src/test/modules/decoding_failover/Makefile
+
+MODULES = decoding_failover
+PGFILEDESC = "decoding_failover - test utility for logical decoding"
+
+EXTENSION = decoding_failover
+DATA = decoding_failover--1.0.sql
+
+EXTRA_INSTALL=contrib/test_decoding
+REGRESS=load_extension
+REGRESS_OPTS = --temp-config=$(top_srcdir)/src/test/modules/decoding_failover/decoding_failover.conf
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/decoding_failover
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/decoding_failover/README b/src/test/modules/decoding_failover/README
new file mode 100644
index 0000000..585f02f
--- /dev/null
+++ b/src/test/modules/decoding_failover/README
@@ -0,0 +1,19 @@
+A test module for logical decoding failover and timeline following.
+
+This module provides a minimal way to maintain logical slots on replicas
+that mirror the state on the master. It doesn't make decoding possible,
+just tracking slot state so that a decoding client that's using the master
+can follow a physical failover to the standby. The master doesn't know
+about the slots on the standby, they're synced by a client that connects
+to both.
+
+This is intentionally not part of the test_decoding module because that's meant
+to serve as example code, where this module exercises internal server features
+by unsafely exposing internal state to SQL. It's not the right way to do
+failover, it's just a simple way to test it from the perl TAP framework to
+prove the feature works.
+
+In a practical implementation of this approach a bgworker on the master would
+monitor slot positions and relay them to a bgworker on the standby that applies
+the position updates without exposing slot internals to SQL. That's too complex
+for this test framework though.
diff --git a/src/test/modules/decoding_failover/decoding_failover--1.0.sql b/src/test/modules/decoding_failover/decoding_failover--1.0.sql
new file mode 100644
index 0000000..078b65e
--- /dev/null
+++ b/src/test/modules/decoding_failover/decoding_failover--1.0.sql
@@ -0,0 +1,16 @@
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION decoding_failover" to load this file. \quit
+
+CREATE OR REPLACE FUNCTION decoding_failover_create_logical_slot(slot_name text, plugin text)
+RETURNS void
+LANGUAGE c AS 'MODULE_PATHNAME';
+
+COMMENT ON FUNCTION decoding_failover_create_logical_slot(text, text)
+IS 'Create a logical slot at a particular lsn and xid. Do not use in production servers, it is not safe. The slot is created with an invalid xmin and lsn.';
+
+CREATE OR REPLACE FUNCTION decoding_failover_advance_logical_slot(slot_name text, new_xmin bigint, new_catalog_xmin bigint, new_restart_lsn pg_lsn, new_confirmed_lsn pg_lsn)
+RETURNS void
+LANGUAGE c AS 'MODULE_PATHNAME';
+
+COMMENT ON FUNCTION decoding_failover_advance_logical_slot(text, bigint, bigint, pg_lsn, pg_lsn)
+IS 'Advance a logical slot directly. Do not use this in production servers, it is not safe.';
diff --git a/src/test/modules/decoding_failover/decoding_failover.c b/src/test/modules/decoding_failover/decoding_failover.c
new file mode 100644
index 0000000..669e6c4
--- /dev/null
+++ b/src/test/modules/decoding_failover/decoding_failover.c
@@ -0,0 +1,122 @@
+/*
+ * decoding_failover.c
+ *		Testing helper for replication slots
+ */
+#include "postgres.h"
+
+#include "access/transam.h"
+#include "fmgr.h"
+#include "miscadmin.h"
+#include "replication/slot.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(decoding_failover_create_logical_slot);
+PG_FUNCTION_INFO_V1(decoding_failover_advance_logical_slot);
+
+static void clear_slot_transient_state(void);
+
+/*
+ * Create a new logical slot, with invalid LSN and xid, directly. This does not
+ * use the snapshot builder or logical decoding machinery. It's only intended
+ * for creating a slot on a replica that mirrors the state of a slot on an
+ * upstream master.
+ *
+ * You should immediately decoding_failover_advance_logical_slot(...) it
+ * after creation.
+ */
+Datum
+decoding_failover_create_logical_slot(PG_FUNCTION_ARGS)
+{
+	char	   *slotname = text_to_cstring(PG_GETARG_TEXT_P(0));
+	char	   *plugin = text_to_cstring(PG_GETARG_TEXT_P(1));
+
+	CheckSlotRequirements();
+
+	ReplicationSlotCreate(slotname, true, RS_PERSISTENT);
+
+	/* register the plugin name with the slot */
+	StrNCpy(NameStr(MyReplicationSlot->data.plugin), plugin, NAMEDATALEN);
+
+	/*
+	 * Initialize persistent state to placeholders to be set by
+	 * decoding_failover_advance_logical_slot .
+	 */
+	MyReplicationSlot->data.xmin = InvalidTransactionId;
+	MyReplicationSlot->data.catalog_xmin = InvalidTransactionId;
+	MyReplicationSlot->data.restart_lsn = InvalidXLogRecPtr;
+	MyReplicationSlot->data.confirmed_flush = InvalidXLogRecPtr;
+
+	clear_slot_transient_state();
+
+	ReplicationSlotRelease();
+
+	PG_RETURN_VOID();
+}
+
+/*
+ * Set the state of a slot.
+ *
+ * This doesn't maintain the non-persistent state at all,
+ * but since the slot isn't in use that's OK.
+ *
+ * There's intentionally no check to prevent slots going backwards
+ * because they can actually go backwards if the master crashes when
+ * it hasn't yet flushed slot state to disk then we copy the older
+ * slot state after recovery.
+ *
+ * There's no checking done for xmin or catalog xmin either, since
+ * we can't really do anything useful that accounts for xid wrap-around.
+ */
+Datum
+decoding_failover_advance_logical_slot(PG_FUNCTION_ARGS)
+{
+	char	   *slotname = text_to_cstring(PG_GETARG_TEXT_P(0));
+	TransactionId new_xmin = (TransactionId) PG_GETARG_INT64(1);
+	TransactionId new_catalog_xmin = (TransactionId) PG_GETARG_INT64(2);
+	XLogRecPtr	restart_lsn = PG_GETARG_LSN(3);
+	XLogRecPtr	confirmed_lsn = PG_GETARG_LSN(4);
+
+	CheckSlotRequirements();
+
+	ReplicationSlotAcquire(slotname);
+
+	if (MyReplicationSlot->data.database != MyDatabaseId)
+		elog(ERROR, "Trying to update a slot on a different database");
+
+	MyReplicationSlot->data.xmin = new_xmin;
+	MyReplicationSlot->data.catalog_xmin = new_catalog_xmin;
+	MyReplicationSlot->data.restart_lsn = restart_lsn;
+	MyReplicationSlot->data.confirmed_flush = confirmed_lsn;
+
+	clear_slot_transient_state();
+
+	ReplicationSlotMarkDirty();
+	ReplicationSlotSave();
+	ReplicationSlotRelease();
+
+	ReplicationSlotsComputeRequiredXmin(false);
+	ReplicationSlotsComputeRequiredLSN();
+
+	PG_RETURN_VOID();
+}
+
+static void
+clear_slot_transient_state(void)
+{
+	Assert(MyReplicationSlot != NULL);
+
+	/*
+	 * Make sure the slot state is the same as if it were newly loaded from
+	 * disk on recovery.
+	 */
+	MyReplicationSlot->effective_xmin = MyReplicationSlot->data.xmin;
+	MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin;
+
+	MyReplicationSlot->candidate_catalog_xmin = InvalidTransactionId;
+	MyReplicationSlot->candidate_xmin_lsn = InvalidXLogRecPtr;
+	MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
+	MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
+}
diff --git a/src/test/modules/decoding_failover/decoding_failover.conf b/src/test/modules/decoding_failover/decoding_failover.conf
new file mode 100644
index 0000000..56b46d7
--- /dev/null
+++ b/src/test/modules/decoding_failover/decoding_failover.conf
@@ -0,0 +1,2 @@
+max_replication_slots=2
+wal_level=logical
diff --git a/src/test/modules/decoding_failover/decoding_failover.control b/src/test/modules/decoding_failover/decoding_failover.control
new file mode 100644
index 0000000..ef05ea2
--- /dev/null
+++ b/src/test/modules/decoding_failover/decoding_failover.control
@@ -0,0 +1,5 @@
+# decoding_failover extension
+comment = 'Logical decoding failover tests'
+default_version = '1.0'
+module_pathname = '$libdir/decoding_failover'
+relocatable = true
diff --git a/src/test/modules/decoding_failover/expected/load_extension.out b/src/test/modules/decoding_failover/expected/load_extension.out
new file mode 100644
index 0000000..fe3c54d
--- /dev/null
+++ b/src/test/modules/decoding_failover/expected/load_extension.out
@@ -0,0 +1,19 @@
+CREATE EXTENSION decoding_failover;
+SELECT decoding_failover_create_logical_slot('test_slot', 'test_decoding');
+ decoding_failover_create_logical_slot 
+---------------------------------------
+ 
+(1 row)
+
+SELECT decoding_failover_advance_logical_slot('test_slot', txid_current(), txid_current(), pg_current_xlog_location(), pg_current_xlog_location());
+ decoding_failover_advance_logical_slot 
+----------------------------------------
+ 
+(1 row)
+
+SELECT pg_drop_replication_slot('test_slot');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
diff --git a/src/test/modules/decoding_failover/sql/load_extension.sql b/src/test/modules/decoding_failover/sql/load_extension.sql
new file mode 100644
index 0000000..4ea9f77
--- /dev/null
+++ b/src/test/modules/decoding_failover/sql/load_extension.sql
@@ -0,0 +1,7 @@
+CREATE EXTENSION decoding_failover;
+
+SELECT decoding_failover_create_logical_slot('test_slot', 'test_decoding');
+
+SELECT decoding_failover_advance_logical_slot('test_slot', txid_current(), txid_current(), pg_current_xlog_location(), pg_current_xlog_location());
+
+SELECT pg_drop_replication_slot('test_slot');
diff --git a/src/test/recovery/Makefile b/src/test/recovery/Makefile
index 9290719..9710370 100644
--- a/src/test/recovery/Makefile
+++ b/src/test/recovery/Makefile
@@ -9,6 +9,8 @@
 #
 #-------------------------------------------------------------------------
 
+EXTRA_INSTALL=contrib/test_decoding src/test/modules/decoding_failover
+
 subdir = src/test/recovery
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
diff --git a/src/test/recovery/t/006_logical_decoding_timelines.pl b/src/test/recovery/t/006_logical_decoding_timelines.pl
new file mode 100644
index 0000000..c212bc0
--- /dev/null
+++ b/src/test/recovery/t/006_logical_decoding_timelines.pl
@@ -0,0 +1,303 @@
+# Demonstrate that logical can follow timeline switches.
+#
+# Logical replication slots can follow timeline switches but it's
+# normally not possible to have a logical slot on a replica where
+# promotion and a timeline switch can occur. The only ways
+# we can create that circumstance are:
+#
+# * By doing a filesystem-level copy of the DB, since pg_basebackup
+#   excludes pg_replslot but we can copy it directly; or
+#
+# * by creating a slot directly at the C level on the replica and
+#   advancing it as we go using the low level APIs. It can't be done
+#   from SQL since logical decoding isn't allowed on replicas.
+#
+# This module uses the first approach to show that timeline following
+# on a logical slot works.
+#
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 20;
+use RecursiveCopy;
+use File::Copy;
+
+my ($stdout, $stderr, $ret);
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1, has_archiving => 1);
+$node_master->append_conf('postgresql.conf', "wal_level = 'logical'\n");
+$node_master->append_conf('postgresql.conf', "max_replication_slots = 2\n");
+$node_master->append_conf('postgresql.conf', "max_wal_senders = 2\n");
+$node_master->append_conf('postgresql.conf', "log_min_messages = 'debug2'\n");
+$node_master->dump_info;
+$node_master->start;
+
+diag "Testing logical timeline following with a filesystem-level copy";
+
+$node_master->safe_psql('postgres',
+"SELECT pg_create_logical_replication_slot('before_basebackup', 'test_decoding');"
+);
+$node_master->safe_psql('postgres', "CREATE TABLE decoding(blah text);");
+$node_master->safe_psql('postgres',
+	"INSERT INTO decoding(blah) VALUES ('beforebb');");
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+
+my $backup_name = 'b1';
+$node_master->backup_fs_hot($backup_name);
+
+my $node_replica = get_new_node('replica');
+$node_replica->init_from_backup(
+	$node_master, $backup_name,
+	has_streaming => 1,
+	has_restoring => 1);
+$node_replica->start;
+
+$node_master->safe_psql('postgres',
+"SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');"
+);
+$node_master->safe_psql('postgres',
+	"INSERT INTO decoding(blah) VALUES ('afterbb');");
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+
+# Verify that only the before base_backup slot is on the replica
+$stdout = $node_replica->safe_psql('postgres',
+	'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name');
+is($stdout, 'before_basebackup',
+	'Expected to find only slot before_basebackup on replica');
+
+# Boom, crash
+$node_master->stop('immediate');
+
+$node_replica->promote;
+$node_replica->poll_query_until('postgres',
+	"SELECT NOT pg_is_in_recovery();");
+
+$node_replica->safe_psql('postgres',
+	"INSERT INTO decoding(blah) VALUES ('after failover');");
+
+# Shouldn't be able to read from slot created after base backup
+($ret, $stdout, $stderr) = $node_replica->psql('postgres',
+"SELECT data FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');"
+);
+is($ret, 3, 'replaying from after_basebackup slot fails');
+like(
+	$stderr,
+	qr/replication slot "after_basebackup" does not exist/,
+	'after_basebackup slot missing');
+
+# Should be able to read from slot created before base backup
+($ret, $stdout, $stderr) = $node_replica->psql(
+	'postgres',
+"SELECT data FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');",
+	timeout => 30);
+is($ret, 0, 'replay from slot before_basebackup succeeds');
+is( $stdout, q(BEGIN
+table public.decoding: INSERT: blah[text]:'beforebb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'afterbb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'after failover'
+COMMIT), 'decoded expected data from slot before_basebackup');
+is($stderr, '', 'replay from slot before_basebackup produces no stderr');
+
+# We don't need the standby anymore
+$node_replica->teardown_node();
+
+
+# OK, time to try the same thing again, but this time we'll be using slot
+# mirroring on the standby and a pg_basebackup of the master.
+
+diag "Testing logical timeline following with decoding_failover module";
+
+$node_master->start();
+
+# Clean up after the last test
+$node_master->safe_psql('postgres', 'DELETE FROM decoding;');
+is( $node_master->psql(
+		'postgres',
+'SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots;'),
+	0,
+	'dropping slots succeeds via pg_drop_replication_slot');
+
+# Same as before, we'll make one slot before basebackup, one after. This time
+# the basebackup will be with pg_basebackup so it'll omit both slots, then
+# we'll use SQL functions provided by the decoding_failover test module to sync
+# them to the replica, do some work, sync them and fail over then test again.
+# This time we should have both the before- and after-basebackup slots working.
+
+is( $node_master->psql(
+		'postgres',
+"SELECT pg_create_logical_replication_slot('before_basebackup', 'test_decoding');"
+	),
+	0,
+	'creating slot before_basebackup succeeds');
+
+$node_master->safe_psql('postgres',
+	"INSERT INTO decoding(blah) VALUES ('beforebb');");
+
+$backup_name = 'b2';
+$node_master->backup($backup_name);
+
+is( $node_master->psql(
+		'postgres',
+"SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');"
+	),
+	0,
+	'creating slot after_basebackup succeeds');
+
+$node_master->safe_psql('postgres',
+	"INSERT INTO decoding(blah) VALUES ('afterbb');");
+
+$node_replica = get_new_node('replica2');
+$node_replica->init_from_backup(
+	$node_master, $backup_name,
+	has_streaming => 1,
+	has_restoring => 1);
+
+$node_replica->start;
+
+# Verify the slots are both absent on the replica
+$stdout = $node_replica->safe_psql('postgres',
+	'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name');
+is($stdout, '', 'No slots exist on the replica');
+
+# Now do our magic to sync the slot states across. Normally
+# this would be being done continuously by a bgworker but
+# we're just doing it by hand for this test. This is exposing
+# postgres innards to SQL so it's unsafe except for testing.
+$node_master->safe_psql('postgres', 'CREATE EXTENSION decoding_failover;');
+my $slotinfo = $node_master->safe_psql('postgres',
+'SELECT slot_name, plugin, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots ORDER BY slot_name'
+);
+diag "Copying slots to replica";
+open my $fh, '<', \$slotinfo or die $!;
+while (<$fh>)
+{
+	print $_;
+	chomp $_;
+	my ($slot_name, $plugin, $xmin, $catalog_xmin, $restart_lsn,
+		$confirmed_flush_lsn)
+	  = map {
+		if   ($_ ne '') { "'$_'" }
+		else            { 'NULL' }
+	  } split qr/\|/, $_;
+
+	print
+"# Copying slot $slot_name,$plugin,$xmin,$catalog_xmin,$restart_lsn,$confirmed_flush_lsn\n";
+	$node_replica->safe_psql('postgres',
+		"SELECT decoding_failover_create_logical_slot($slot_name, $plugin);");
+	$node_replica->safe_psql('postgres',
+"SELECT decoding_failover_advance_logical_slot($slot_name, $xmin, $catalog_xmin, $restart_lsn, $confirmed_flush_lsn);"
+	);
+}
+close $fh or die $!;
+
+# Now both slots are present on the replica and exactly match the master
+$stdout = $node_replica->safe_psql('postgres',
+	'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name');
+is( $stdout,
+	"after_basebackup\nbefore_basebackup",
+	'both slots now exist on replica');
+
+$stdout = $node_replica->safe_psql(
+	'postgres',
+	qq{SELECT slot_name, plugin, xmin, catalog_xmin,
+			  restart_lsn, confirmed_flush_lsn
+		 FROM pg_replication_slots
+	 ORDER BY slot_name});
+is($stdout, $slotinfo,
+	"slot data read back from replica matches slot data on master");
+
+# We now have to copy some extra WAL to satisfy the requirements of the oldest
+# replication slot. pg_basebackup doesn't know to copy the extra WAL for slots
+# so we have to help out. We know the WAL is still retained on the master
+# because we haven't advanced the slots there.
+#
+# Figure out what the oldest segment we need is by looking at the restart_lsn
+# of the oldest slot.
+#
+# It only makes sense to do this once the slots are created on the replica,
+# otherwise it might just delete the segments again.
+
+my $oldest_needed_segment = $node_master->safe_psql(
+	'postgres',
+	qq{SELECT pg_xlogfile_name((
+      SELECT restart_lsn
+      FROM pg_replication_slots
+      ORDER BY restart_lsn ASC
+      LIMIT 1
+     ));}
+);
+
+diag "oldest needed xlog seg is $oldest_needed_segment ";
+
+# WAL segment names sort lexically so we can just grab everything > than this
+# segment.
+opendir(my $pg_xlog, $node_master->data_dir . "/pg_xlog") or die $!;
+while (my $seg = readdir $pg_xlog)
+{
+	next unless $seg >= $oldest_needed_segment && $seg =~ /^[0-9]{24}/;
+	diag "copying xlog seg $seg";
+	copy(
+		$node_master->data_dir . "/pg_xlog/" . $seg,
+		$node_replica->data_dir . "/pg_xlog/" . $seg
+	) or die "copy of xlog seg $seg failed: $!";
+}
+closedir $pg_xlog;
+
+# Boom, crash the master
+$node_master->stop('immediate');
+
+$node_replica->promote;
+$node_replica->poll_query_until('postgres',
+	"SELECT NOT pg_is_in_recovery();");
+
+$node_replica->safe_psql('postgres',
+	"INSERT INTO decoding(blah) VALUES ('after failover');");
+
+# This time we can read from both slots
+($ret, $stdout, $stderr) = $node_replica->psql(
+	'postgres',
+"SELECT data FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');",
+	timeout => 30);
+is($ret, 0, 'replay from slot after_basebackup succeeds');
+is( $stdout, q(BEGIN
+table public.decoding: INSERT: blah[text]:'afterbb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'after failover'
+COMMIT), 'decoded expected data from slot after_basebackup');
+is($stderr, '', 'replay from slot after_basebackup produces no stderr');
+
+# Should be able to read from slot created before base backup
+#
+# This would fail with an error about missing WAL segments if we hadn't
+# copied extra WAL earlier.
+($ret, $stdout, $stderr) = $node_replica->psql(
+	'postgres',
+"SELECT data FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');",
+	timeout => 30);
+is($ret, 0, 'replay from slot before_basebackup succeeds');
+is( $stdout, q(BEGIN
+table public.decoding: INSERT: blah[text]:'beforebb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'afterbb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'after failover'
+COMMIT), 'decoded expected data from slot before_basebackup');
+is($stderr, '', 'replay from slot before_basebackup produces no stderr');
+
+($ret, $stdout, $stderr) = $node_replica->psql('postgres',
+	'SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots;');
+is($ret,    0,  'dropping slots succeeds via pg_drop_replication_slot');
+is($stderr, '', 'dropping slots produces no stderr output');
+
+1;
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to