On 1 March 2016 at 21:00, Craig Ringer <cr...@2ndquadrant.com> wrote:

> Hi all
>
> Per discussion on the failover slots thread (
> https://commitfest.postgresql.org/9/488/) I'm splitting timeline
> following for logical slots into its own separate patch.
>
>
I've updated the logical decoding timeline following patch to fix a bug
found as a result of test development related to how Pg renames the last
WAL seg on the old timeline to suffix it with .partial on promotion. The
xlogreader must switch to reading from the newest-timeline version of a
given segment eagerly, for the first page of the segment, since that's the
only one guaranteed to actually exist.

I'd really appreciate some review of the logic there by people who know
timelines well and preferably know the xlogreader. It's really just one
function and 2/3 comments; the code is simple but the reasoning leading to
it is not.


I've also attached an updated version of the tests posted a few days ago.
The tests depend on the remaining patches from the TAP enhancements tree so
it's easiest to just get the whole tree from
https://github.com/2ndQuadrant/postgres/tree/dev/logical-decoding-timeline-following
(subject to regular rebases and force pushes, do not use as a base).

The tests now include a test module that exposes some slots guts to SQL to
allow the client to sync slot state from master to replica(s) without
needing failover slots and the use of extra WAL as transport. It's very
much for-testing-only.

The new test module is used by a second round of tests to demonstrate the
practicality of failover of a logical replication client to a physical
replica using a base backup taken by pg_basebackup and without the presence
of failover slots. I won't pretend it's pretty.

This proves that the approach works barring unforseen showstoppers. It also
proves it's pretty ugly - failover slots provide a much, MUCH simpler and
safer way for clients to achieve this with way less custom code needed by
each client to sync slot state.

I've got a bit of cleanup to do in the test suite and a few more tests to
write for cases where the slot on the replica is allowed to fall behind the
slot on the master but this is mostly waiting on the remaining two TAP test
patches before it can be evaluated for possible push.

-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From 37bd2e654345af65749ccff6ca73d3afebf67072 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Thu, 11 Feb 2016 10:44:14 +0800
Subject: [PATCH 1/2] Allow logical slots to follow timeline switches

Make logical replication slots timeline-aware, so replay can
continue from a historical timeline onto the server's current
timeline.

This is required to make failover slots possible and may also
be used by extensions that CreateReplicationSlot on a standby
and replay from that slot once the replica is promoted.

This does NOT add support for replaying from a logical slot on
a standby or for syncing slots to replicas.
---
 src/backend/access/transam/xlogreader.c        |  43 ++++-
 src/backend/access/transam/xlogutils.c         | 240 +++++++++++++++++++++++--
 src/backend/replication/logical/logicalfuncs.c |  38 +++-
 src/include/access/xlogreader.h                |  35 +++-
 src/include/access/xlogutils.h                 |   2 +
 5 files changed, 323 insertions(+), 35 deletions(-)

diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index fcb0872..5899f44 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -10,6 +10,9 @@
  *
  * NOTES
  *		See xlogreader.h for more notes on this facility.
+ *
+ * 		The xlogreader is compiled as both front-end and backend code so
+ * 		it may not use elog, server-defined static variables, etc.
  *-------------------------------------------------------------------------
  */
 
@@ -116,6 +119,9 @@ XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data)
 		return NULL;
 	}
 
+	/* Will be loaded on first read */
+	state->timelineHistory = NULL;
+
 	return state;
 }
 
@@ -135,6 +141,13 @@ XLogReaderFree(XLogReaderState *state)
 	pfree(state->errormsg_buf);
 	if (state->readRecordBuf)
 		pfree(state->readRecordBuf);
+#ifdef FRONTEND
+	/* FE code doesn't use this and we can't list_free_deep on FE */
+	Assert(state->timelineHistory == NULL);
+#else
+	if (state->timelineHistory)
+		list_free_deep(state->timelineHistory);
+#endif
 	pfree(state->readBuf);
 	pfree(state);
 }
@@ -208,9 +221,11 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 
 	if (RecPtr == InvalidXLogRecPtr)
 	{
+		/* No explicit start point, read the record after the one we just read */
 		RecPtr = state->EndRecPtr;
 
 		if (state->ReadRecPtr == InvalidXLogRecPtr)
+			/* allow readPageTLI to go backward */
 			randAccess = true;
 
 		/*
@@ -223,6 +238,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 	else
 	{
 		/*
+		 * Caller supplied a position to start at.
+		 *
 		 * In this case, the passed-in record pointer should already be
 		 * pointing to a valid record starting position.
 		 */
@@ -309,8 +326,9 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 		/* XXX: more validation should be done here */
 		if (total_len < SizeOfXLogRecord)
 		{
-			report_invalid_record(state, "invalid record length at %X/%X",
-								  (uint32) (RecPtr >> 32), (uint32) RecPtr);
+			report_invalid_record(state, "invalid record length at %X/%X: wanted %lu, got %u",
+								  (uint32) (RecPtr >> 32), (uint32) RecPtr,
+								  SizeOfXLogRecord, total_len);
 			goto err;
 		}
 		gotheader = false;
@@ -466,9 +484,7 @@ err:
 	 * Invalidate the xlog page we've cached. We might read from a different
 	 * source after failure.
 	 */
-	state->readSegNo = 0;
-	state->readOff = 0;
-	state->readLen = 0;
+	XLogReaderInvalCache(state);
 
 	if (state->errormsg_buf[0] != '\0')
 		*errormsg = state->errormsg_buf;
@@ -599,9 +615,9 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
 {
 	if (record->xl_tot_len < SizeOfXLogRecord)
 	{
-		report_invalid_record(state,
-							  "invalid record length at %X/%X",
-							  (uint32) (RecPtr >> 32), (uint32) RecPtr);
+		report_invalid_record(state, "invalid record length at %X/%X: wanted %lu, got %u",
+							  (uint32) (RecPtr >> 32), (uint32) RecPtr,
+							  SizeOfXLogRecord, record->xl_tot_len);
 		return false;
 	}
 	if (record->xl_rmid > RM_MAX_ID)
@@ -1337,3 +1353,14 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 
 	return true;
 }
+
+/*
+ * Invalidate the xlog reader's cached page to force a re-read
+ */
+void
+XLogReaderInvalCache(XLogReaderState *state)
+{
+	state->readSegNo = 0;
+	state->readOff = 0;
+	state->readLen = 0;
+}
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 444e218..21f2030 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -7,6 +7,9 @@
  * This file contains support routines that are used by XLOG replay functions.
  * None of this code is used during normal system operation.
  *
+ * Unlike xlogreader.c this is only compiled for the backend so it may use
+ * elog, etc.
+ *
  *
  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
@@ -21,6 +24,7 @@
 
 #include "miscadmin.h"
 
+#include "access/timeline.h"
 #include "access/xlog.h"
 #include "access/xlog_internal.h"
 #include "access/xlogutils.h"
@@ -651,6 +655,8 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
 	static int	sendFile = -1;
 	static XLogSegNo sendSegNo = 0;
 	static uint32 sendOff = 0;
+	/* So we notice if asked for the same seg on a new tli: */
+	static TimeLineID lastTLI = 0;
 
 	p = buf;
 	recptr = startptr;
@@ -664,11 +670,11 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
 
 		startoff = recptr % XLogSegSize;
 
-		if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
+		/* Do we need to switch to a new xlog segment? */
+		if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo) || lastTLI != tli)
 		{
 			char		path[MAXPGPATH];
 
-			/* Switch to another logfile segment */
 			if (sendFile >= 0)
 				close(sendFile);
 
@@ -692,6 +698,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
 									path)));
 			}
 			sendOff = 0;
+			lastTLI = tli;
 		}
 
 		/* Need to seek in the file? */
@@ -759,28 +766,62 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	int			count;
 
 	loc = targetPagePtr + reqLen;
+
+	/* Make sure enough xlog is available... */
 	while (1)
 	{
 		/*
-		 * TODO: we're going to have to do something more intelligent about
-		 * timelines on standbys. Use readTimeLineHistory() and
-		 * tliOfPointInHistory() to get the proper LSN? For now we'll catch
-		 * that case earlier, but the code and TODO is left in here for when
-		 * that changes.
+		 * Check which timeline to get the record from.
+		 *
+		 * We have to do it after each loop because if we're in
+		 * recovery as a cascading standby the current timeline
+		 * might've become historical.
 		 */
-		if (!RecoveryInProgress())
+		XLogReadDetermineTimeline(state);
+
+		if (state->currTLI == ThisTimeLineID)
 		{
-			*pageTLI = ThisTimeLineID;
-			flushptr = GetFlushRecPtr();
+			/*
+			 * We're reading from the current timeline so we might
+			 * have to wait for the desired record to be generated
+			 * (or, for a standby, received & replayed)
+			 */
+			if (!RecoveryInProgress())
+			{
+				*pageTLI = ThisTimeLineID;
+				flushptr = GetFlushRecPtr();
+			}
+			else
+				flushptr = GetXLogReplayRecPtr(pageTLI);
+
+			if (loc <= flushptr)
+				break;
+
+			CHECK_FOR_INTERRUPTS();
+			pg_usleep(1000L);
 		}
 		else
-			flushptr = GetXLogReplayRecPtr(pageTLI);
-
-		if (loc <= flushptr)
+		{
+			/*
+			 * We're on a historical timeline, limit reading to the
+			 * switch point where we moved to the next timeline.
+			 */
+			flushptr = state->currTLIValidUntil;
+
+			/*
+			 * FIXME: Setting pageTLI to the TLI the *record* we
+			 * want is on can be slightly wrong; the page might
+			 * begin on an older timeline if it contains a timeline
+			 * switch, since its xlog segment will've been copied
+			 * from the prior timeline. We should really read the
+			 * page header. It's pretty harmless though as nothing
+			 * cares so long as the timeline doesn't go backwards.
+			 */
+			*pageTLI = state->currTLI;
+
+			/* No need to wait on a historical timeline */
 			break;
-
-		CHECK_FOR_INTERRUPTS();
-		pg_usleep(1000L);
+		}
 	}
 
 	/* more than one block available */
@@ -793,7 +834,172 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	else
 		count = flushptr - targetPagePtr;
 
-	XLogRead(cur_page, *pageTLI, targetPagePtr, XLOG_BLCKSZ);
+	XLogRead(cur_page, *pageTLI, targetPagePtr, count);
 
 	return count;
 }
+
+/*
+ * Figure out what timeline to look on for the record the xlogreader
+ * is being asked asked to read, in currRecPtr. This may be used
+ * to determine which xlog segment file to open, etc.
+ *
+ * It depends on:
+ *
+ * - Whether we're reading a record immediately following one we read
+ *   before or doing a random read. We can only use the cached
+ *   timeline info if we're reading sequentially.
+ *
+ * - Whether the timeline of the prior record read was historical or
+ *   the current timeline and, if historical, on where it's valid up
+ *   to. On a historical timeline we need to avoid reading past the
+ *   timeline switch point. The records after it are probably invalid,
+ *   but worse, they might be valid but *different*.
+ *
+ * - If the current timeline became historical since the last record
+ *   we read. We need to make sure we don't read past the switch
+ *   point.
+ *
+ * None of this has any effect unless callbacks use currTLI to
+ * determine which timeline to read from and optionally use the
+ * validity limit to avoid reading past the valid end of a page.
+ *
+ * We need to switch to an xlog segment from the new timeline
+ * eagerly when on a historical timeline, as soon as we reach the
+ * start of the xlog segment containing the timeline switch.  The
+ * server copied the segment to the new timeline so all the data up
+ * to the switch point is the same but there's no guarantee the old
+ * segment will still exist. It may have been deleted or renamed
+ * with a .partial suffix so we can't necessarily keep reading from
+ * the old TLI even though tliSwitchPoint says it's OK.
+ *
+ * An xlog segment may contain data from an older timeline
+ * if it was copied during a timeline switch. Callers may NOT assume
+ * that currTLI is the timeline that will be in a given page's
+ * xlp_tli; the page may begin on older timeline or we might be
+ * reading from historical timeline data on a segment that's
+ * been copied to a new timeline.
+ */
+void
+XLogReadDetermineTimeline(XLogReaderState *state)
+{
+	if (state->timelineHistory == NULL)
+		state->timelineHistory = readTimeLineHistory(ThisTimeLineID);
+
+	if (state->currRecPtr != state->EndRecPtr)
+	{
+		/*
+		 * Not reading the immediately following record so
+		 * invalidate cached timeline info.
+		 */
+		state->currTLI = 0;
+		state->currTLIValidUntil = InvalidXLogRecPtr;
+	}
+
+	if (state->currTLIValidUntil == InvalidXLogRecPtr &&
+		state->currTLI != ThisTimeLineID &&
+		state->currTLI != 0)
+	{
+		/*
+		 * We were reading what was the current timeline but it became
+		 * historical. Either we were replaying as a replica and got
+		 * promoted or we're replaying as a cascading replica from a
+		 * parent that got promoted.
+		 *
+		 * Force a re-read of the timeline history.
+		 */
+		list_free_deep(state->timelineHistory);
+		state->timelineHistory = readTimeLineHistory(ThisTimeLineID);
+
+		elog(DEBUG2, "timeline %u became historical during decoding",
+				state->currTLI);
+
+		/* then invalidate the timeline info so we read again */
+		state->currTLI = 0;
+		state->currTLIValidUntil = InvalidXLogRecPtr;
+	}
+
+	if (state->currRecPtr == state->EndRecPtr &&
+		state->currTLI != 0 &&
+		state->currTLIValidUntil != InvalidXLogRecPtr &&
+		state->currRecPtr >= state->currTLIValidUntil)
+	{
+		/*
+		 * We're reading the immedately following record but we're
+		 * at a timeline boundary (or on a segment containing one)
+		 * and must read the next record from the new TLI.
+		 */
+		elog(DEBUG2, "Requested record %X/%X is on segment containing end of TLI %u "
+				"valid until %X/%X, switching to next timeline",
+				(uint32)(state->currRecPtr >> 32),
+				(uint32)state->currRecPtr,
+				state->currTLI,
+				(uint32)(state->currTLIValidUntil >> 32),
+				(uint32)(state->currTLIValidUntil));
+
+		/* Invalidate TLI info so we look up the next TLI */
+		state->currTLI = 0;
+		state->currTLIValidUntil = InvalidXLogRecPtr;
+	}
+
+	if (state->currTLI == 0)
+	{
+		/*
+		 * Something changed. We're not reading the record immediately
+		 * after the one we just read, the previous record was at
+		 * timeline boundary or we didn't yet determine the timeline
+		 * to read from.
+		 *
+		 * Work out what timeline this record is on. We might read
+		 * it from the segment on this TLI or, if the segment
+		 * contains newer timelines, the copy from a newer TLI.
+		 */
+		state->currTLI = tliOfPointInHistory(state->currRecPtr,
+				state->timelineHistory);
+
+		/*
+		 * Look for the most recent timeline that's on the same xlog
+		 * segment as this record, since that's the only one we can
+		 * assume is still readable.
+		 */
+		while (state->currTLI != ThisTimeLineID &&
+			   state->currTLIValidUntil == InvalidXLogRecPtr)
+		{
+			XLogRecPtr	tliSwitch;
+			TimeLineID	nextTLI;
+
+			tliSwitch = tliSwitchPoint(state->currTLI, state->timelineHistory,
+					&nextTLI);
+
+			state->currTLIValidUntil = ((tliSwitch / XLogSegSize) * XLogSegSize);
+
+			if (state->currRecPtr >= state->currTLIValidUntil)
+			{
+				/*
+				 * The new currTLI ends on this WAL segment so
+				 * check the next TLI to see if it's the last
+				 * one on the segment.
+				 *
+				 * If that's the current TLI we'll stop
+				 * searching.
+				 */
+				state->currTLI = nextTLI;
+				state->currTLIValidUntil = InvalidXLogRecPtr;
+			}
+		}
+
+		/*
+		 * We're now either reading from the first xlog seg in the
+		 * current server's timeline or the most recent historical
+		 * timeline that exists on the target segment.
+		 */
+		elog(DEBUG2, "XLog read ptr %X/%X is on seg with tli %u valid until %X/%X, server current tli is %u",
+				(uint32)(state->currRecPtr >> 32),
+				(uint32)state->currRecPtr,
+				state->currTLI,
+				(uint32)(state->currTLIValidUntil >> 32),
+				(uint32)(state->currTLIValidUntil),
+				ThisTimeLineID);
+	}
+}
+
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index f789fc1..f29fca3 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -231,12 +231,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 	rsinfo->setResult = p->tupstore;
 	rsinfo->setDesc = p->tupdesc;
 
-	/* compute the current end-of-wal */
-	if (!RecoveryInProgress())
-		end_of_wal = GetFlushRecPtr();
-	else
-		end_of_wal = GetXLogReplayRecPtr(NULL);
-
 	ReplicationSlotAcquire(NameStr(*name));
 
 	PG_TRY();
@@ -263,6 +257,14 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 
 		ctx->output_writer_private = p;
 
+		/*
+		 * We start reading xlog from the restart lsn, even though in
+		 * CreateDecodingContext we set the snapshot builder up using the
+		 * slot's candidate_restart_lsn. This means we might read xlog we don't
+		 * actually decode rows from, but the snapshot builder might need it to
+		 * get to a consistent point. The point we start returning data to
+		 * *users* at is the candidate restart lsn from the decoding context.
+		 */
 		startptr = MyReplicationSlot->data.restart_lsn;
 
 		CurrentResourceOwner = ResourceOwnerCreate(CurrentResourceOwner, "logical decoding");
@@ -270,8 +272,14 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 		/* invalidate non-timetravel entries */
 		InvalidateSystemCaches();
 
+		if (!RecoveryInProgress())
+			end_of_wal = GetFlushRecPtr();
+		else
+			end_of_wal = GetXLogReplayRecPtr(NULL);
+
+		/* Decode until we run out of records */
 		while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) ||
-			 (ctx->reader->EndRecPtr && ctx->reader->EndRecPtr < end_of_wal))
+			 (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal))
 		{
 			XLogRecord *record;
 			char	   *errm = NULL;
@@ -280,6 +288,10 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 			if (errm)
 				elog(ERROR, "%s", errm);
 
+			/*
+			 * Now that we've set up the xlog reader state subsequent calls
+			 * pass InvalidXLogRecPtr to say "continue from last record"
+			 */
 			startptr = InvalidXLogRecPtr;
 
 			/*
@@ -299,6 +311,18 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 			CHECK_FOR_INTERRUPTS();
 		}
 
+		/* Make sure timeline lookups use the start of the next record */
+		startptr = ctx->reader->EndRecPtr;
+
+		/*
+		 * The XLogReader will read a page past the valid end of WAL
+		 * because it doesn't know about timelines. When we switch
+		 * timelines and ask it for the first page on the new timeline it
+		 * will think it has it cached, but it'll have the old partial
+		 * page and say it can't find the next record. So flush the cache.
+		 */
+		XLogReaderInvalCache(ctx->reader);
+
 		tuplestore_donestoring(tupstore);
 
 		CurrentResourceOwner = old_resowner;
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 7553cc4..20e4bca 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -20,12 +20,16 @@
  *		with the XLogRec* macros and functions. You can also decode a
  *		record that's already constructed in memory, without reading from
  *		disk, by calling the DecodeXLogRecord() function.
+ *
+ * 		The xlogreader is compiled as both front-end and backend code so
+ * 		it may not use elog, server-defined static variables, etc.
  *-------------------------------------------------------------------------
  */
 #ifndef XLOGREADER_H
 #define XLOGREADER_H
 
 #include "access/xlogrecord.h"
+#include "nodes/pg_list.h"
 
 typedef struct XLogReaderState XLogReaderState;
 
@@ -139,26 +143,48 @@ struct XLogReaderState
 	 * ----------------------------------------
 	 */
 
-	/* Buffer for currently read page (XLOG_BLCKSZ bytes) */
+	/*
+	 * Buffer for currently read page (XLOG_BLCKSZ bytes, valid up to
+	 * at least readLen bytes)
+	 */
 	char	   *readBuf;
 
-	/* last read segment, segment offset, read length, TLI */
+	/*
+	 * last read segment, segment offset, read length, TLI for
+	 * data currently in readBuf.
+	 */
 	XLogSegNo	readSegNo;
 	uint32		readOff;
 	uint32		readLen;
 	TimeLineID	readPageTLI;
 
-	/* beginning of last page read, and its TLI  */
+	/*
+	 * beginning of prior page read, and its TLI. Doesn't
+	 * necessarily correspond to what's in readBuf, used for
+	 * timeline sanity checks.
+	 */
 	XLogRecPtr	latestPagePtr;
 	TimeLineID	latestPageTLI;
 
 	/* beginning of the WAL record being read. */
 	XLogRecPtr	currRecPtr;
+	/* timeline to read it from, 0 if a lookup is required */
+	TimeLineID  currTLI;
+	/*
+	 * Pointer to the end of the last whole segment on the timeline in currTLI
+	 * if it's historical or InvalidXLogRecPtr if currTLI is the current
+	 * timeline. This is *not* the tliSwitchPoint but it's guaranteed safe
+	 * to read up to this point from currTLI.
+	 */
+	XLogRecPtr	currTLIValidUntil;
 
 	/* Buffer for current ReadRecord result (expandable) */
 	char	   *readRecordBuf;
 	uint32		readRecordBufSize;
 
+	/* cached timeline history */
+	List	   *timelineHistory;
+
 	/* Buffer to hold error message */
 	char	   *errormsg_buf;
 };
@@ -174,6 +200,9 @@ extern void XLogReaderFree(XLogReaderState *state);
 extern struct XLogRecord *XLogReadRecord(XLogReaderState *state,
 			   XLogRecPtr recptr, char **errormsg);
 
+/* Flush any cached page */
+extern void XLogReaderInvalCache(XLogReaderState *state);
+
 #ifdef FRONTEND
 extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr);
 #endif   /* FRONTEND */
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 1b9abce..86df8cf 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -50,4 +50,6 @@ extern void FreeFakeRelcacheEntry(Relation fakerel);
 extern int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI);
 
+extern void XLogReadDetermineTimeline(XLogReaderState *state);
+
 #endif
-- 
2.1.0

From 11041cbd99e194028c756bd597764f1e21a26afe Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Tue, 1 Mar 2016 20:57:17 +0800
Subject: [PATCH 2/2] Tests for logical decoding timeline following

Demonstrate that timeline following for logical decoding works
independently of the proposed failover slot machinery by using the
client to sync slot state from master to replica.

See the README in src/test/modules/decoding_failover
and comments in src/test/recovery/t/006_logical_decoding_timelines.pl
---
 src/test/modules/Makefile                          |   1 +
 src/test/modules/decoding_failover/.gitignore      |   3 +
 src/test/modules/decoding_failover/Makefile        |  22 ++
 src/test/modules/decoding_failover/README          |  19 ++
 .../decoding_failover/decoding_failover--1.0.sql   |  16 ++
 .../modules/decoding_failover/decoding_failover.c  | 124 ++++++++++
 .../decoding_failover/decoding_failover.conf       |   2 +
 .../decoding_failover/decoding_failover.control    |   5 +
 .../decoding_failover/expected/load_extension.out  |  19 ++
 .../decoding_failover/sql/load_extension.sql       |   7 +
 src/test/recovery/Makefile                         |   2 +
 .../recovery/t/006_logical_decoding_timelines.pl   | 271 +++++++++++++++++++++
 12 files changed, 491 insertions(+)
 create mode 100644 src/test/modules/decoding_failover/.gitignore
 create mode 100644 src/test/modules/decoding_failover/Makefile
 create mode 100644 src/test/modules/decoding_failover/README
 create mode 100644 src/test/modules/decoding_failover/decoding_failover--1.0.sql
 create mode 100644 src/test/modules/decoding_failover/decoding_failover.c
 create mode 100644 src/test/modules/decoding_failover/decoding_failover.conf
 create mode 100644 src/test/modules/decoding_failover/decoding_failover.control
 create mode 100644 src/test/modules/decoding_failover/expected/load_extension.out
 create mode 100644 src/test/modules/decoding_failover/sql/load_extension.sql
 create mode 100644 src/test/recovery/t/006_logical_decoding_timelines.pl

diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 6167ec1..bbaf94f 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -7,6 +7,7 @@ include $(top_builddir)/src/Makefile.global
 SUBDIRS = \
 		  brin \
 		  commit_ts \
+		  decoding_failover \
 		  dummy_seclabel \
 		  test_ddl_deparse \
 		  test_extensions \
diff --git a/src/test/modules/decoding_failover/.gitignore b/src/test/modules/decoding_failover/.gitignore
new file mode 100644
index 0000000..543c50d
--- /dev/null
+++ b/src/test/modules/decoding_failover/.gitignore
@@ -0,0 +1,3 @@
+results/
+tmp_check/
+log/
diff --git a/src/test/modules/decoding_failover/Makefile b/src/test/modules/decoding_failover/Makefile
new file mode 100644
index 0000000..97c2c28
--- /dev/null
+++ b/src/test/modules/decoding_failover/Makefile
@@ -0,0 +1,22 @@
+# src/test/modules/worker_spi/Makefile
+
+MODULES = decoding_failover
+PGFILEDESC = "decoding_failover - test utility for logical decoding"
+
+EXTENSION = decoding_failover
+DATA = decoding_failover--1.0.sql
+
+EXTRA_INSTALL=contrib/test_decoding
+REGRESS=load_extension
+REGRESS_OPTS = --temp-config=$(top_srcdir)/src/test/modules/decoding_failover/decoding_failover.conf
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/decoding_failover
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/decoding_failover/README b/src/test/modules/decoding_failover/README
new file mode 100644
index 0000000..585f02f
--- /dev/null
+++ b/src/test/modules/decoding_failover/README
@@ -0,0 +1,19 @@
+A test module for logical decoding failover and timeline following.
+
+This module provides a minimal way to maintain logical slots on replicas
+that mirror the state on the master. It doesn't make decoding possible,
+just tracking slot state so that a decoding client that's using the master
+can follow a physical failover to the standby. The master doesn't know
+about the slots on the standby, they're synced by a client that connects
+to both.
+
+This is intentionally not part of the test_decoding module because that's meant
+to serve as example code, where this module exercises internal server features
+by unsafely exposing internal state to SQL. It's not the right way to do
+failover, it's just a simple way to test it from the perl TAP framework to
+prove the feature works.
+
+In a practical implementation of this approach a bgworker on the master would
+monitor slot positions and relay them to a bgworker on the standby that applies
+the position updates without exposing slot internals to SQL. That's too complex
+for this test framework though.
diff --git a/src/test/modules/decoding_failover/decoding_failover--1.0.sql b/src/test/modules/decoding_failover/decoding_failover--1.0.sql
new file mode 100644
index 0000000..078b65e
--- /dev/null
+++ b/src/test/modules/decoding_failover/decoding_failover--1.0.sql
@@ -0,0 +1,16 @@
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION decoding_failover" to load this file. \quit
+
+CREATE OR REPLACE FUNCTION decoding_failover_create_logical_slot(slot_name text, plugin text)
+RETURNS void
+LANGUAGE c AS 'MODULE_PATHNAME';
+
+COMMENT ON FUNCTION decoding_failover_create_logical_slot(text, text)
+IS 'Create a logical slot at a particular lsn and xid. Do not use in production servers, it is not safe. The slot is created with an invalid xmin and lsn.';
+
+CREATE OR REPLACE FUNCTION decoding_failover_advance_logical_slot(slot_name text, new_xmin bigint, new_catalog_xmin bigint, new_restart_lsn pg_lsn, new_confirmed_lsn pg_lsn)
+RETURNS void
+LANGUAGE c AS 'MODULE_PATHNAME';
+
+COMMENT ON FUNCTION decoding_failover_advance_logical_slot(text, bigint, bigint, pg_lsn, pg_lsn)
+IS 'Advance a logical slot directly. Do not use this in production servers, it is not safe.';
diff --git a/src/test/modules/decoding_failover/decoding_failover.c b/src/test/modules/decoding_failover/decoding_failover.c
new file mode 100644
index 0000000..bab0f3b
--- /dev/null
+++ b/src/test/modules/decoding_failover/decoding_failover.c
@@ -0,0 +1,124 @@
+#include "postgres.h"
+
+#include "access/transam.h"
+
+#include "replication/slot.h"
+
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+
+#include "fmgr.h"
+#include "miscadmin.h"
+
+PG_MODULE_MAGIC;
+
+Datum decoding_failover_create_logical_slot(PG_FUNCTION_ARGS);
+PG_FUNCTION_INFO_V1(decoding_failover_create_logical_slot);
+
+Datum decoding_failover_advance_logical_slot(PG_FUNCTION_ARGS);
+PG_FUNCTION_INFO_V1(decoding_failover_advance_logical_slot);
+
+static void clear_slot_transient_state(void);
+
+/*
+ * Create a new logical slot, with invalid LSN and xid, directly. This does not
+ * use the snapshot builder or logical decoding machinery. It's only intended
+ * for creating a slot on a replica that mirrors the state of a slot on an
+ * upstream master.
+ *
+ * You should immediately decoding_failover_advance_logical_slot(...) it
+ * after creation.
+ */
+Datum
+decoding_failover_create_logical_slot(PG_FUNCTION_ARGS)
+{
+	char* slotname = text_to_cstring(PG_GETARG_TEXT_P(0));
+	char* plugin = text_to_cstring(PG_GETARG_TEXT_P(1));
+
+	CheckSlotRequirements();
+
+	ReplicationSlotCreate(slotname, true, RS_PERSISTENT);
+
+	/* register the plugin name with the slot */
+	StrNCpy(NameStr(MyReplicationSlot->data.plugin), plugin, NAMEDATALEN);
+
+	/*
+	 * Initialize persistent state to placeholders to be set
+	 * by decoding_failover_advance_logical_slot .
+	 */
+	MyReplicationSlot->data.xmin = InvalidTransactionId;
+	MyReplicationSlot->data.catalog_xmin = InvalidTransactionId;
+	MyReplicationSlot->data.restart_lsn = InvalidXLogRecPtr;
+	MyReplicationSlot->data.confirmed_flush = InvalidXLogRecPtr;
+
+	clear_slot_transient_state();
+
+	ReplicationSlotRelease();
+
+	PG_RETURN_VOID();
+}
+
+/*
+ * Set the state of a slot.
+ *
+ * This doesn't maintain the non-persistent state at all,
+ * but since the slot isn't in use that's OK.
+ *
+ * There's intentionally no check to prevent slots going backwards
+ * because they can actually go backwards if the master crashes when
+ * it hasn't yet flushed slot state to disk then we copy the older
+ * slot state after recovery.
+ *
+ * There's no checking done for xmin or catalog xmin either, since
+ * we can't really do anything useful that accounts for xid wrap-around.
+ */
+Datum
+decoding_failover_advance_logical_slot(PG_FUNCTION_ARGS)
+{
+	char* slotname = text_to_cstring(PG_GETARG_TEXT_P(0));
+	TransactionId new_xmin = (TransactionId)PG_GETARG_INT64(1);
+	TransactionId new_catalog_xmin = (TransactionId)PG_GETARG_INT64(2);
+	XLogRecPtr restart_lsn = PG_GETARG_LSN(3);
+	XLogRecPtr confirmed_lsn = PG_GETARG_LSN(4);
+
+	CheckSlotRequirements();
+
+	ReplicationSlotAcquire(slotname);
+
+	if (MyReplicationSlot->data.database != MyDatabaseId)
+		elog(ERROR, "Trying to update a slot on a different database");
+
+	MyReplicationSlot->data.xmin = new_xmin;
+	MyReplicationSlot->data.catalog_xmin = new_catalog_xmin;
+	MyReplicationSlot->data.restart_lsn = restart_lsn;
+	MyReplicationSlot->data.confirmed_flush = confirmed_lsn;
+
+	clear_slot_transient_state();
+
+	ReplicationSlotMarkDirty();
+	ReplicationSlotSave();
+	ReplicationSlotRelease();
+
+	ReplicationSlotsComputeRequiredXmin(false);
+	ReplicationSlotsComputeRequiredLSN();
+
+	PG_RETURN_VOID();
+}
+
+static void
+clear_slot_transient_state(void)
+{
+	Assert(MyReplicationSlot != NULL);
+
+	/*
+	 * Make sure the slot state is the same as if it were newly
+	 * loaded from disk on recovery.
+	 */
+	MyReplicationSlot->effective_xmin = MyReplicationSlot->data.xmin;
+	MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin;
+
+	MyReplicationSlot->candidate_catalog_xmin = InvalidTransactionId;
+	MyReplicationSlot->candidate_xmin_lsn = InvalidXLogRecPtr;
+	MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
+	MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
+}
diff --git a/src/test/modules/decoding_failover/decoding_failover.conf b/src/test/modules/decoding_failover/decoding_failover.conf
new file mode 100644
index 0000000..56b46d7
--- /dev/null
+++ b/src/test/modules/decoding_failover/decoding_failover.conf
@@ -0,0 +1,2 @@
+max_replication_slots=2
+wal_level=logical
diff --git a/src/test/modules/decoding_failover/decoding_failover.control b/src/test/modules/decoding_failover/decoding_failover.control
new file mode 100644
index 0000000..92329b3
--- /dev/null
+++ b/src/test/modules/decoding_failover/decoding_failover.control
@@ -0,0 +1,5 @@
+# worker_spi extension
+comment = 'Logical decoding failover tests'
+default_version = '1.0'
+module_pathname = '$libdir/decoding_failover'
+relocatable = true
diff --git a/src/test/modules/decoding_failover/expected/load_extension.out b/src/test/modules/decoding_failover/expected/load_extension.out
new file mode 100644
index 0000000..fe3c54d
--- /dev/null
+++ b/src/test/modules/decoding_failover/expected/load_extension.out
@@ -0,0 +1,19 @@
+CREATE EXTENSION decoding_failover;
+SELECT decoding_failover_create_logical_slot('test_slot', 'test_decoding');
+ decoding_failover_create_logical_slot 
+---------------------------------------
+ 
+(1 row)
+
+SELECT decoding_failover_advance_logical_slot('test_slot', txid_current(), txid_current(), pg_current_xlog_location(), pg_current_xlog_location());
+ decoding_failover_advance_logical_slot 
+----------------------------------------
+ 
+(1 row)
+
+SELECT pg_drop_replication_slot('test_slot');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
diff --git a/src/test/modules/decoding_failover/sql/load_extension.sql b/src/test/modules/decoding_failover/sql/load_extension.sql
new file mode 100644
index 0000000..4ea9f77
--- /dev/null
+++ b/src/test/modules/decoding_failover/sql/load_extension.sql
@@ -0,0 +1,7 @@
+CREATE EXTENSION decoding_failover;
+
+SELECT decoding_failover_create_logical_slot('test_slot', 'test_decoding');
+
+SELECT decoding_failover_advance_logical_slot('test_slot', txid_current(), txid_current(), pg_current_xlog_location(), pg_current_xlog_location());
+
+SELECT pg_drop_replication_slot('test_slot');
diff --git a/src/test/recovery/Makefile b/src/test/recovery/Makefile
index 9290719..9710370 100644
--- a/src/test/recovery/Makefile
+++ b/src/test/recovery/Makefile
@@ -9,6 +9,8 @@
 #
 #-------------------------------------------------------------------------
 
+EXTRA_INSTALL=contrib/test_decoding src/test/modules/decoding_failover
+
 subdir = src/test/recovery
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
diff --git a/src/test/recovery/t/006_logical_decoding_timelines.pl b/src/test/recovery/t/006_logical_decoding_timelines.pl
new file mode 100644
index 0000000..7268a45
--- /dev/null
+++ b/src/test/recovery/t/006_logical_decoding_timelines.pl
@@ -0,0 +1,271 @@
+# Demonstrate that logical can follow timeline switches.
+#
+# Logical replication slots can follow timeline switches but it's
+# normally not possible to have a logical slot on a replica where
+# promotion and a timeline switch can occur. The only ways
+# we can create that circumstance are:
+#
+# * By doing a filesystem-level copy of the DB, since pg_basebackup
+#   excludes pg_replslot but we can copy it directly; or
+#
+# * by creating a slot directly at the C level on the replica and
+#   advancing it as we go using the low level APIs. It can't be done
+#   from SQL since logical decoding isn't allowed on replicas.
+#
+# This module uses the first approach to show that timeline following
+# on a logical slot works.
+#
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 20;
+use RecursiveCopy;
+use File::Copy;
+
+my ($stdout, $stderr, $ret);
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1, has_archiving => 1);
+$node_master->append_conf('postgresql.conf', "wal_level = 'logical'\n");
+$node_master->append_conf('postgresql.conf', "max_replication_slots = 2\n");
+$node_master->append_conf('postgresql.conf', "max_wal_senders = 2\n");
+$node_master->append_conf('postgresql.conf', "log_min_messages = 'debug2'\n");
+$node_master->dump_info;
+$node_master->start;
+
+diag "Testing logical timeline following with a filesystem-level copy";
+
+$node_master->safe_psql('postgres',
+"SELECT pg_create_logical_replication_slot('before_basebackup', 'test_decoding');"
+);
+$node_master->safe_psql('postgres', "CREATE TABLE decoding(blah text);");
+$node_master->safe_psql('postgres',
+	"INSERT INTO decoding(blah) VALUES ('beforebb');");
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+
+my $backup_name = 'b1';
+$node_master->backup_fs_hot($backup_name);
+
+my $node_replica = get_new_node('replica');
+$node_replica->init_from_backup(
+	$node_master, $backup_name,
+	has_streaming => 1,
+	has_restoring => 1);
+$node_replica->start;
+
+$node_master->safe_psql('postgres',
+"SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');"
+);
+$node_master->safe_psql('postgres',
+	"INSERT INTO decoding(blah) VALUES ('afterbb');");
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+
+# Verify that only the before base_backup slot is on the replica
+$stdout = $node_replica->safe_psql('postgres', 'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name');
+is($stdout, 'before_basebackup', 'Expected to find only slot before_basebackup on replica');
+
+# Boom, crash
+$node_master->stop('immediate');
+
+$node_replica->promote;
+$node_replica->poll_query_until('postgres',
+	"SELECT NOT pg_is_in_recovery();");
+
+$node_replica->safe_psql('postgres',
+	"INSERT INTO decoding(blah) VALUES ('after failover');");
+
+# Shouldn't be able to read from slot created after base backup
+($ret, $stdout, $stderr) = $node_replica->psql('postgres',
+"SELECT data FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');"
+);
+is($ret, 3, 'replaying from after_basebackup slot fails');
+like(
+	$stderr,
+	qr/replication slot "after_basebackup" does not exist/,
+	'after_basebackup slot missing');
+
+# Should be able to read from slot created before base backup
+($ret, $stdout, $stderr) = $node_replica->psql(
+	'postgres',
+"SELECT data FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');",
+	timeout => 30);
+is($ret, 0, 'replay from slot before_basebackup succeeds');
+is( $stdout, q(BEGIN
+table public.decoding: INSERT: blah[text]:'beforebb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'afterbb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'after failover'
+COMMIT), 'decoded expected data from slot before_basebackup');
+is($stderr, '', 'replay from slot before_basebackup produces no stderr');
+
+# We don't need the standby anymore
+$node_replica->teardown_node();
+
+
+
+# OK, time to try the same thing again, but this time we'll
+# be using slot mirroring on the standby and a pg_basebackup
+# of the master.
+
+diag "Testing logical timeline following with decoding_failover module";
+
+$node_master->start();
+# Clean up after the last test
+$node_master->safe_psql('postgres', 'DELETE FROM decoding;');
+is($node_master->psql('postgres', 'SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots;'),
+  0, 'dropping slots succeeds via pg_drop_replication_slot');
+
+# Same as before, we'll make one slot before basebackup, one after. This time
+# the basebackup will be with pg_basebackup so it'll omit both slots, then
+# we'll use SQL functions provided by the decoding_failover test module to
+# sync them to the replica, do some work, sync them and fail over then test
+# again. This time we should have both the before- and after-basebackup
+# slots working.
+
+is($node_master->psql('postgres',
+"SELECT pg_create_logical_replication_slot('before_basebackup', 'test_decoding');"
+), 0, 'creating slot before_basebackup succeeds');
+
+$node_master->safe_psql('postgres',
+	"INSERT INTO decoding(blah) VALUES ('beforebb');");
+
+$backup_name = 'b2';
+$node_master->backup($backup_name);
+
+is($node_master->psql('postgres',
+"SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');"
+), 0, 'creating slot after_basebackup succeeds');
+
+$node_master->safe_psql('postgres',
+	"INSERT INTO decoding(blah) VALUES ('afterbb');");
+
+$node_replica = get_new_node('replica2');
+$node_replica->init_from_backup(
+	$node_master, $backup_name,
+	has_streaming => 1,
+	has_restoring => 1);
+
+$node_replica->start;
+
+# Verify the slots are both absent on the replica
+$stdout = $node_replica->safe_psql('postgres', 'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name');
+is($stdout, '', 'No slots exist on the replica');
+
+# Now do our magic to sync the slot states across. Normally
+# this would be being done continuously by a bgworker but
+# we're just doing it by hand for this test. This is exposing
+# postgres innards to SQL so it's unsafe except for testing.
+$node_master->safe_psql('postgres', 'CREATE EXTENSION decoding_failover;');
+my $slotinfo = $node_master->safe_psql('postgres',
+	'SELECT slot_name, plugin, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots ORDER BY slot_name');
+diag "Copying slots to replica";
+open my $fh, '<', \$slotinfo or die $!;
+while (<$fh>) {
+  print $_;
+  chomp $_;
+  my ($slot_name, $plugin, $xmin, $catalog_xmin, $restart_lsn, $confirmed_flush_lsn)
+  	= map { if ($_ ne '') {"'$_'"} else {'NULL'}; } split qr/\|/, $_;
+  print "# Copying slot $slot_name,$plugin,$xmin,$catalog_xmin,$restart_lsn,$confirmed_flush_lsn\n";
+  $node_replica->safe_psql('postgres',
+  	"SELECT decoding_failover_create_logical_slot($slot_name, $plugin);");
+  $node_replica->safe_psql('postgres',
+  	"SELECT decoding_failover_advance_logical_slot($slot_name, $xmin, $catalog_xmin, $restart_lsn, $confirmed_flush_lsn);");
+}
+close $fh or die $!;
+
+# Now both slots are present on the replica and exactly match the master
+$stdout = $node_replica->safe_psql('postgres', 'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name');
+is($stdout, "after_basebackup\nbefore_basebackup", 'both slots now exist on replica');
+
+$stdout = $node_replica->safe_psql('postgres',
+	'SELECT slot_name, plugin, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots ORDER BY slot_name');
+is($stdout, $slotinfo, "slot data read back from replica matches slot data on master");
+
+# We now have to copy some extra WAL to satisfy the requirements of the oldest
+# replication slot. pg_basebackup doesn't know to copy the extra WAL for slots
+# so we have to help out. We know the WAL is still retained on the master
+# because we haven't advanced the slots there.
+#
+# Figure out what the oldest segment we need is by looking at the restart_lsn
+# of the oldest slot.
+#
+# It only makes sense to do this once the slots are created on the replica,
+# otherwise it might just delete the segments again.
+
+my $oldest_needed_segment = $node_master->safe_psql('postgres',
+  qq{SELECT pg_xlogfile_name((
+      SELECT restart_lsn
+      FROM pg_replication_slots
+      ORDER BY restart_lsn ASC
+      LIMIT 1
+     ));}
+  );
+
+diag "oldest needed xlog seg is $oldest_needed_segment ";
+
+# WAL segment names sort lexically so we can just grab everything > than this
+# segment.
+opendir(my $pg_xlog, $node_master->data_dir . "/pg_xlog") or die $!;
+while (my $seg = readdir $pg_xlog)
+{
+  next unless $seg >= $oldest_needed_segment && $seg =~ /^[0-9]{24}/;
+  diag "copying xlog seg $seg";
+  copy($node_master->data_dir . "/pg_xlog/" . $seg,
+       $node_replica->data_dir . "/pg_xlog/" . $seg)
+       or die "copy of xlog seg $seg failed: $!";
+}
+closedir $pg_xlog;
+
+# Boom, crash the master
+$node_master->stop('immediate');
+
+$node_replica->promote;
+$node_replica->poll_query_until('postgres',
+	"SELECT NOT pg_is_in_recovery();");
+
+$node_replica->safe_psql('postgres',
+	"INSERT INTO decoding(blah) VALUES ('after failover');");
+
+# This time we can read from both slots
+($ret, $stdout, $stderr) = $node_replica->psql('postgres',
+"SELECT data FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');",
+timeout => 30);
+is($ret, 0, 'replay from slot after_basebackup succeeds');
+is( $stdout, q(BEGIN
+table public.decoding: INSERT: blah[text]:'afterbb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'after failover'
+COMMIT), 'decoded expected data from slot after_basebackup');
+is($stderr, '', 'replay from slot after_basebackup produces no stderr');
+
+# Should be able to read from slot created before base backup
+#
+# This would fail with an error about missing WAL segments if we hadn't
+# copied extra WAL earlier.
+($ret, $stdout, $stderr) = $node_replica->psql(
+	'postgres',
+"SELECT data FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');",
+	timeout => 30);
+is($ret, 0, 'replay from slot before_basebackup succeeds');
+is( $stdout, q(BEGIN
+table public.decoding: INSERT: blah[text]:'beforebb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'afterbb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'after failover'
+COMMIT), 'decoded expected data from slot before_basebackup');
+is($stderr, '', 'replay from slot before_basebackup produces no stderr');
+
+($ret, $stdout, $stderr) = $node_replica->psql('postgres', 'SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots;');
+is($ret, 0, 'dropping slots succeeds via pg_drop_replication_slot');
+is($stderr, '', 'dropping slots produces no stderr output');
+
+1;
-- 
2.1.0

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to