Here's a new failover slots rev, addressing the issues Oleksii Kliukin
raised and adding a bunch of TAP tests.

In particular, for the checkpoint issue I landed up moving
CheckPointReplicationSlots to occur at the start of a checkpoint, before
writing WAL is prohibited. As the comments note it's just a convenient
place and time to do it anyway. That means it has to be called separately
at a restartpoint, but I don't think that's a biggie.

The tests for this took me quite a while, much (much) longer than the code
changes.

I split the patch up a bit more too so individual changes are more
logically grouped and clearer. I expect it'd be mostly or entirely squashed
for commit.
From 256d43f4c8195c893efeb0319d7642853d15f3a9 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Tue, 23 Feb 2016 15:59:37 +0800
Subject: [PATCH 1/7] Allow replication slots to follow failover

Originally replication slots were unique to a single node and weren't
recorded in WAL or replicated. A logical decoding client couldn't follow
a physical standby failover and promotion because the promoted replica
didn't have the original master's slots. The replica may not have
retained all required WAL and there was no way to create a new logical
slot and rewind it back to the point the logical client had replayed to.

Failover slots lift this limitation by replicating slots consistently to
physical standbys, keeping them up to date and using them in WAL
retention calculations. This allows a logical decoding client to follow
a physical failover and promotion without losing its place in the change
stream.

A failover slot may only be created on a master server, as it must be
able to write WAL. This limitation may be lifted later.

pg_basebackup is also modified to copy the contents of pg_replslot.
Non-failover slots will now be removed during backend startup instead
of being omitted from the copy.

This patch does not add any user interface for failover slots. There's
no way to create them from SQL or from the walsender. That and the
documentation for failover slots are in the next patch in the series
so that this patch is entirely focused on the implementation.

Craig Ringer, based on a prototype by Simon Riggs
---
 src/backend/access/rmgrdesc/Makefile               |   2 +-
 src/backend/access/rmgrdesc/replslotdesc.c         |  65 +++
 src/backend/access/transam/rmgr.c                  |   1 +
 src/backend/access/transam/xlog.c                  |   5 +-
 src/backend/commands/dbcommands.c                  |   3 +
 src/backend/replication/basebackup.c               |  12 -
 src/backend/replication/logical/decode.c           |   1 +
 src/backend/replication/logical/logical.c          |  25 +-
 src/backend/replication/slot.c                     | 586 +++++++++++++++++++--
 src/backend/replication/slotfuncs.c                |   4 +-
 src/backend/replication/walsender.c                |   8 +-
 src/bin/pg_xlogdump/replslotdesc.c                 |   1 +
 src/bin/pg_xlogdump/rmgrdesc.c                     |   1 +
 src/include/access/rmgrlist.h                      |   1 +
 src/include/replication/slot.h                     |  69 +--
 src/include/replication/slot_xlog.h                | 100 ++++
 .../modules/decoding_failover/decoding_failover.c  |   6 +-
 17 files changed, 758 insertions(+), 132 deletions(-)
 create mode 100644 src/backend/access/rmgrdesc/replslotdesc.c
 create mode 120000 src/bin/pg_xlogdump/replslotdesc.c
 create mode 100644 src/include/replication/slot_xlog.h

diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile
index c72a1f2..600b544 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -10,7 +10,7 @@ include $(top_builddir)/src/Makefile.global
 
 OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o gindesc.o gistdesc.o \
 	   hashdesc.o heapdesc.o mxactdesc.o nbtdesc.o relmapdesc.o \
-	   replorigindesc.o seqdesc.o smgrdesc.o spgdesc.o \
+	   replorigindesc.o replslotdesc.o seqdesc.o smgrdesc.o spgdesc.o \
 	   standbydesc.o tblspcdesc.o xactdesc.o xlogdesc.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/rmgrdesc/replslotdesc.c b/src/backend/access/rmgrdesc/replslotdesc.c
new file mode 100644
index 0000000..5829e8d
--- /dev/null
+++ b/src/backend/access/rmgrdesc/replslotdesc.c
@@ -0,0 +1,65 @@
+/*-------------------------------------------------------------------------
+ *
+ * replslotdesc.c
+ *	  rmgr descriptor routines for replication/slot.c
+ *
+ * Portions Copyright (c) 2015, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/rmgrdesc/replslotdesc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "replication/slot_xlog.h"
+
+void
+replslot_desc(StringInfo buf, XLogReaderState *record)
+{
+	char	   *rec = XLogRecGetData(record);
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	switch (info)
+	{
+		case XLOG_REPLSLOT_UPDATE:
+			{
+				ReplicationSlotInWAL xlrec;
+
+				xlrec = (ReplicationSlotInWAL) rec;
+
+				appendStringInfo(buf, "of slot %s with restart %X/%X and xid %u confirmed to %X/%X",
+						NameStr(xlrec->name),
+						(uint32)(xlrec->restart_lsn>>32), (uint32)(xlrec->restart_lsn),
+						xlrec->xmin,
+						(uint32)(xlrec->confirmed_flush>>32), (uint32)(xlrec->confirmed_flush));
+
+				break;
+			}
+		case XLOG_REPLSLOT_DROP:
+			{
+				xl_replslot_drop *xlrec;
+
+				xlrec = (xl_replslot_drop *) rec;
+
+				appendStringInfo(buf, "of slot %s", NameStr(xlrec->name));
+
+				break;
+			}
+	}
+}
+
+const char *
+replslot_identify(uint8 info)
+{
+	switch (info)
+	{
+		case XLOG_REPLSLOT_UPDATE:
+			return "UPDATE";
+		case XLOG_REPLSLOT_DROP:
+			return "DROP";
+		default:
+			return NULL;
+	}
+}
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 7c4d773..0bd5796 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -24,6 +24,7 @@
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
 #include "replication/origin.h"
+#include "replication/slot_xlog.h"
 #include "storage/standby.h"
 #include "utils/relmapper.h"
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 94b79ac..a92f09d 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -6366,8 +6366,11 @@ StartupXLOG(void)
 	/*
 	 * Initialize replication slots, before there's a chance to remove
 	 * required resources.
+	 *
+	 * If we're in archive recovery then non-failover slots are no
+	 * longer of any use and should be dropped during startup.
 	 */
-	StartupReplicationSlots();
+	StartupReplicationSlots(ArchiveRecoveryRequested);
 
 	/*
 	 * Startup logical state, needs to be setup now so we have proper data
diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c
index c1c0223..61fc45b 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -2114,6 +2114,9 @@ dbase_redo(XLogReaderState *record)
 		/* Clean out the xlog relcache too */
 		XLogDropDatabase(xlrec->db_id);
 
+		/* Drop any logical failover slots for this database */
+		ReplicationSlotsDropDBSlots(xlrec->db_id);
+
 		/* And remove the physical files */
 		if (!rmtree(dst_path, true))
 			ereport(WARNING,
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index af0fb09..ab1f271 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -973,18 +973,6 @@ sendDir(char *path, int basepathlen, bool sizeonly, List *tablespaces,
 		}
 
 		/*
-		 * Skip pg_replslot, not useful to copy. But include it as an empty
-		 * directory anyway, so we get permissions right.
-		 */
-		if (strcmp(de->d_name, "pg_replslot") == 0)
-		{
-			if (!sizeonly)
-				_tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf);
-			size += 512;		/* Size of the header just added */
-			continue;
-		}
-
-		/*
 		 * We can skip pg_xlog, the WAL segments need to be fetched from the
 		 * WAL archive anyway. But include it as an empty directory anyway, so
 		 * we get permissions right.
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 56be1ed..948e31f 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -135,6 +135,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 		case RM_BRIN_ID:
 		case RM_COMMIT_TS_ID:
 		case RM_REPLORIGIN_ID:
+		case RM_REPLSLOT_ID:
 			break;
 		case RM_NEXT_ID:
 			elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record));
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 2e6d3f9..2c7b749 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -85,16 +85,19 @@ CheckLogicalDecodingRequirements(void)
 				 errmsg("logical decoding requires a database connection")));
 
 	/* ----
-	 * TODO: We got to change that someday soon...
+	 * TODO: Allow logical decoding from a standby
 	 *
-	 * There's basically three things missing to allow this:
+	 * There are some things missing to allow this:
 	 * 1) We need to be able to correctly and quickly identify the timeline a
-	 *	  LSN belongs to
-	 * 2) We need to force hot_standby_feedback to be enabled at all times so
-	 *	  the primary cannot remove rows we need.
-	 * 3) support dropping replication slots referring to a database, in
-	 *	  dbase_redo. There can't be any active ones due to HS recovery
-	 *	  conflicts, so that should be relatively easy.
+	 *    LSN belongs to
+	 * 2) To prevent needed rows from being removed we would need
+	 *    to enhance hot_standby_feedback so it sends both xmin and
+	 *    catalog_xmin to the master.  A standby slot can't write WAL, so we
+	 *    wouldn't be able to use it directly for failover, without some very
+	 *    complex state interactions via master.
+	 *
+	 * So this doesn't seem likely to change anytime soon.
+	 *
 	 * ----
 	 */
 	if (RecoveryInProgress())
@@ -272,7 +275,7 @@ CreateInitDecodingContext(char *plugin,
 	slot->effective_catalog_xmin = GetOldestSafeDecodingTransactionId();
 	slot->data.catalog_xmin = slot->effective_catalog_xmin;
 
-	ReplicationSlotsComputeRequiredXmin(true);
+	ReplicationSlotsUpdateRequiredXmin(true);
 
 	LWLockRelease(ProcArrayLock);
 
@@ -908,8 +911,8 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 			MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin;
 			SpinLockRelease(&MyReplicationSlot->mutex);
 
-			ReplicationSlotsComputeRequiredXmin(false);
-			ReplicationSlotsComputeRequiredLSN();
+			ReplicationSlotsUpdateRequiredXmin(false);
+			ReplicationSlotsUpdateRequiredLSN();
 		}
 	}
 	else
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index affa9b9..d83118d 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -24,7 +24,18 @@
  * directory. Inside that directory the state file will contain the slot's
  * own data. Additional data can be stored alongside that file if required.
  * While the server is running, the state data is also cached in memory for
- * efficiency.
+ * efficiency. Non-failover slots are NOT subject to WAL logging and may
+ * be used on standbys (though that's only supported for physical slots at
+ * the moment). They use tempfile writes and swaps for crash safety.
+ *
+ * A failover slot created on a master node generates WAL records that
+ * maintain a copy of the slot on standby nodes. If a standby node is
+ * promoted the failover slot allows access to be restarted just as if the
+ * the original master node was being accessed, allowing for the timeline
+ * change. The replica considers slot positions when removing WAL to make
+ * sure it can satisfy the needs of slots after promotion.  For logical
+ * decoding slots the slot's internal state is kept up to date so it's
+ * ready for use after promotion.
  *
  * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
  * or free a slot. ReplicationSlotControlLock must be taken in shared mode
@@ -44,6 +55,7 @@
 #include "common/string.h"
 #include "miscadmin.h"
 #include "replication/slot.h"
+#include "replication/slot_xlog.h"
 #include "storage/fd.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
@@ -101,10 +113,14 @@ static LWLockTranche ReplSlotIOLWLockTranche;
 static void ReplicationSlotDropAcquired(void);
 
 /* internal persistency functions */
-static void RestoreSlotFromDisk(const char *name);
+static void RestoreSlotFromDisk(const char *name, bool drop_nonfailover_slots);
 static void CreateSlotOnDisk(ReplicationSlot *slot);
 static void SaveSlotToPath(ReplicationSlot *slot, const char *path, int elevel);
 
+/* internal redo functions */
+static void ReplicationSlotRedoCreateOrUpdate(ReplicationSlotInWAL xlrec);
+static void ReplicationSlotRedoDrop(const char * slotname);
+
 /*
  * Report shared-memory space needed by ReplicationSlotShmemInit.
  */
@@ -220,7 +236,8 @@ ReplicationSlotValidateName(const char *name, int elevel)
  */
 void
 ReplicationSlotCreate(const char *name, bool db_specific,
-					  ReplicationSlotPersistency persistency)
+					  ReplicationSlotPersistency persistency,
+					  bool failover)
 {
 	ReplicationSlot *slot = NULL;
 	int			i;
@@ -229,6 +246,11 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 
 	ReplicationSlotValidateName(name, ERROR);
 
+	if (failover && RecoveryInProgress())
+		ereport(ERROR,
+				(errmsg("a failover slot may not be created on a replica"),
+				 errhint("Create the slot on the master server instead")));
+
 	/*
 	 * If some other backend ran this code concurrently with us, we'd likely both
 	 * allocate the same slot, and that would be bad.  We'd also be at risk of
@@ -278,6 +300,9 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	StrNCpy(NameStr(slot->data.name), name, NAMEDATALEN);
 	slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
 	slot->data.restart_lsn = InvalidXLogRecPtr;
+	/* Slot timeline is unused and always zero */
+	slot->data.restart_tli = 0;
+	slot->data.failover = failover;
 
 	/*
 	 * Create the slot on disk.  We haven't actually marked the slot allocated
@@ -313,6 +338,10 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 
 /*
  * Find a previously created slot and mark it as used by this backend.
+ *
+ * Sets active_pid and assigns MyReplicationSlot iff successfully acquired.
+ *
+ * ERRORs on an attempt to acquire a failover slot when in recovery.
  */
 void
 ReplicationSlotAcquire(const char *name)
@@ -335,7 +364,11 @@ ReplicationSlotAcquire(const char *name)
 		{
 			SpinLockAcquire(&s->mutex);
 			active_pid = s->active_pid;
-			if (active_pid == 0)
+			/*
+			 * We can only claim a slot for our use if it's not claimed
+			 * by someone else AND it isn't a failover slot on a standby.
+			 */
+			if (active_pid == 0 && !(RecoveryInProgress() && s->data.failover))
 				s->active_pid = MyProcPid;
 			SpinLockRelease(&s->mutex);
 			slot = s;
@@ -349,12 +382,24 @@ ReplicationSlotAcquire(const char *name)
 		ereport(ERROR,
 				(errcode(ERRCODE_UNDEFINED_OBJECT),
 				 errmsg("replication slot \"%s\" does not exist", name)));
+
 	if (active_pid != 0)
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_IN_USE),
 			   errmsg("replication slot \"%s\" is active for PID %d",
 					  name, active_pid)));
 
+	/*
+	 * An attempt to use a failover slot from a standby must fail since
+	 * we can't write WAL from a standby and there's no sensible way
+	 * to advance slot position from both replica and master anyway.
+	 */
+	if (RecoveryInProgress() && slot->data.failover)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_IN_USE),
+				 errmsg("replication slot \"%s\" is reserved for use after failover",
+					  name)));
+
 	/* We made this slot active, so it's ours now. */
 	MyReplicationSlot = slot;
 }
@@ -411,6 +456,9 @@ ReplicationSlotDrop(const char *name)
 /*
  * Permanently drop the currently acquired replication slot which will be
  * released by the point this function returns.
+ *
+ * Callers must NOT hold ReplicationSlotControlLock in SHARED mode.  EXCLUSIVE
+ * is OK, or not held at all.
  */
 static void
 ReplicationSlotDropAcquired(void)
@@ -418,9 +466,14 @@ ReplicationSlotDropAcquired(void)
 	char		path[MAXPGPATH];
 	char		tmppath[MAXPGPATH];
 	ReplicationSlot *slot = MyReplicationSlot;
+	bool slot_is_failover;
+	bool took_control_lock = false,
+		 took_allocation_lock = false;
 
 	Assert(MyReplicationSlot != NULL);
 
+	slot_is_failover = slot->data.failover;
+
 	/* slot isn't acquired anymore */
 	MyReplicationSlot = NULL;
 
@@ -428,8 +481,27 @@ ReplicationSlotDropAcquired(void)
 	 * If some other backend ran this code concurrently with us, we might try
 	 * to delete a slot with a certain name while someone else was trying to
 	 * create a slot with the same name.
+	 *
+	 * If called with the lock already held it MUST be held in
+	 * EXCLUSIVE mode.
 	 */
-	LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
+	if (!LWLockHeldByMe(ReplicationSlotAllocationLock))
+	{
+		took_allocation_lock = true;
+		LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
+	}
+
+	/* Record the drop in XLOG if we aren't replaying WAL */
+	if (XLogInsertAllowed() && slot_is_failover)
+	{
+		xl_replslot_drop xlrec;
+
+		memcpy(&(xlrec.name), NameStr(slot->data.name), NAMEDATALEN);
+
+		XLogBeginInsert();
+		XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
+		(void) XLogInsert(RM_REPLSLOT_ID, XLOG_REPLSLOT_DROP);
+	}
 
 	/* Generate pathnames. */
 	sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
@@ -459,7 +531,11 @@ ReplicationSlotDropAcquired(void)
 	}
 	else
 	{
-		bool		fail_softly = slot->data.persistency == RS_EPHEMERAL;
+		bool		fail_softly = false;
+
+		if (RecoveryInProgress() ||
+			slot->data.persistency == RS_EPHEMERAL)
+			fail_softly = true;
 
 		SpinLockAcquire(&slot->mutex);
 		slot->active_pid = 0;
@@ -477,18 +553,27 @@ ReplicationSlotDropAcquired(void)
 	 * grabbing the mutex because nobody else can be scanning the array here,
 	 * and nobody can be attached to this slot and thus access it without
 	 * scanning the array.
+	 *
+	 * You must hold the lock in EXCLUSIVE mode or not at all.
 	 */
-	LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
+	if (!LWLockHeldByMe(ReplicationSlotControlLock))
+	{
+		took_control_lock = true;
+		LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
+	}
+
 	slot->active_pid = 0;
 	slot->in_use = false;
-	LWLockRelease(ReplicationSlotControlLock);
+
+	if (took_control_lock)
+		LWLockRelease(ReplicationSlotControlLock);
 
 	/*
 	 * Slot is dead and doesn't prevent resource removal anymore, recompute
 	 * limits.
 	 */
-	ReplicationSlotsComputeRequiredXmin(false);
-	ReplicationSlotsComputeRequiredLSN();
+	ReplicationSlotsUpdateRequiredXmin(false);
+	ReplicationSlotsUpdateRequiredLSN();
 
 	/*
 	 * If removing the directory fails, the worst thing that will happen is
@@ -504,7 +589,8 @@ ReplicationSlotDropAcquired(void)
 	 * We release this at the very end, so that nobody starts trying to create
 	 * a slot while we're still cleaning up the detritus of the old one.
 	 */
-	LWLockRelease(ReplicationSlotAllocationLock);
+	if (took_allocation_lock)
+		LWLockRelease(ReplicationSlotAllocationLock);
 }
 
 /*
@@ -544,6 +630,9 @@ ReplicationSlotMarkDirty(void)
 /*
  * Convert a slot that's marked as RS_EPHEMERAL to a RS_PERSISTENT slot,
  * guaranteeing it will be there after an eventual crash.
+ *
+ * Failover slots will emit a create xlog record at this time, having
+ * not been previously written to xlog.
  */
 void
 ReplicationSlotPersist(void)
@@ -565,7 +654,7 @@ ReplicationSlotPersist(void)
  * Compute the oldest xmin across all slots and store it in the ProcArray.
  */
 void
-ReplicationSlotsComputeRequiredXmin(bool already_locked)
+ReplicationSlotsUpdateRequiredXmin(bool already_locked)
 {
 	int			i;
 	TransactionId agg_xmin = InvalidTransactionId;
@@ -610,10 +699,20 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
 }
 
 /*
- * Compute the oldest restart LSN across all slots and inform xlog module.
+ * Update the xlog module's copy of the minimum restart lsn across all slots
  */
 void
-ReplicationSlotsComputeRequiredLSN(void)
+ReplicationSlotsUpdateRequiredLSN(void)
+{
+	XLogSetReplicationSlotMinimumLSN(ReplicationSlotsComputeRequiredLSN(false));
+}
+
+/*
+ * Compute the oldest restart LSN across all slots (or optionally
+ * only failover slots) and return it.
+ */
+XLogRecPtr
+ReplicationSlotsComputeRequiredLSN(bool failover_only)
 {
 	int			i;
 	XLogRecPtr	min_required = InvalidXLogRecPtr;
@@ -625,14 +724,19 @@ ReplicationSlotsComputeRequiredLSN(void)
 	{
 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
 		XLogRecPtr	restart_lsn;
+		bool		failover;
 
 		if (!s->in_use)
 			continue;
 
 		SpinLockAcquire(&s->mutex);
 		restart_lsn = s->data.restart_lsn;
+		failover = s->data.failover;
 		SpinLockRelease(&s->mutex);
 
+		if (failover_only && !failover)
+			continue;
+
 		if (restart_lsn != InvalidXLogRecPtr &&
 			(min_required == InvalidXLogRecPtr ||
 			 restart_lsn < min_required))
@@ -640,7 +744,7 @@ ReplicationSlotsComputeRequiredLSN(void)
 	}
 	LWLockRelease(ReplicationSlotControlLock);
 
-	XLogSetReplicationSlotMinimumLSN(min_required);
+	return min_required;
 }
 
 /*
@@ -649,7 +753,7 @@ ReplicationSlotsComputeRequiredLSN(void)
  * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
  * slots exist.
  *
- * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
+ * NB: this returns a value >= ReplicationSlotsUpdateRequiredLSN(), since it
  * ignores physical replication slots.
  *
  * The results aren't required frequently, so we don't maintain a precomputed
@@ -747,6 +851,45 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
 	return false;
 }
 
+void
+ReplicationSlotsDropDBSlots(Oid dboid)
+{
+	int			i;
+
+	Assert(MyReplicationSlot == NULL);
+
+	LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+		if (s->data.database == dboid)
+		{
+			/*
+			 * There should be no connections to this dbid
+			 * therefore all slots for this dbid should be
+			 * logical, inactive failover slots.
+			 */
+			Assert(s->active_pid == 0);
+			Assert(s->in_use == false);
+			Assert(SlotIsLogical(s));
+
+			/*
+			 * Acquire the replication slot
+			 */
+			MyReplicationSlot = s;
+
+			/*
+			 * No need to deactivate slot, especially since we
+			 * already hold ReplicationSlotControlLock.
+			 */
+			ReplicationSlotDropAcquired();
+		}
+	}
+	LWLockRelease(ReplicationSlotControlLock);
+
+	MyReplicationSlot = NULL;
+}
 
 /*
  * Check whether the server's configuration supports using replication
@@ -779,12 +922,13 @@ ReplicationSlotReserveWal(void)
 
 	Assert(slot != NULL);
 	Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
+	Assert(slot->data.restart_tli == 0);
 
 	/*
 	 * The replication slot mechanism is used to prevent removal of required
 	 * WAL. As there is no interlock between this routine and checkpoints, WAL
 	 * segments could concurrently be removed when a now stale return value of
-	 * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
+	 * ReplicationSlotsUpdateRequiredLSN() is used. In the unlikely case that
 	 * this happens we'll just retry.
 	 */
 	while (true)
@@ -821,12 +965,12 @@ ReplicationSlotReserveWal(void)
 		}
 
 		/* prevent WAL removal as fast as possible */
-		ReplicationSlotsComputeRequiredLSN();
+		ReplicationSlotsUpdateRequiredLSN();
 
 		/*
 		 * If all required WAL is still there, great, otherwise retry. The
 		 * slot should prevent further removal of WAL, unless there's a
-		 * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
+		 * concurrent ReplicationSlotsUpdateRequiredLSN() after we've written
 		 * the new restart_lsn above, so normally we should never need to loop
 		 * more than twice.
 		 */
@@ -878,7 +1022,7 @@ CheckPointReplicationSlots(void)
  * needs to be run before we start crash recovery.
  */
 void
-StartupReplicationSlots(void)
+StartupReplicationSlots(bool drop_nonfailover_slots)
 {
 	DIR		   *replication_dir;
 	struct dirent *replication_de;
@@ -917,7 +1061,7 @@ StartupReplicationSlots(void)
 		}
 
 		/* looks like a slot in a normal state, restore */
-		RestoreSlotFromDisk(replication_de->d_name);
+		RestoreSlotFromDisk(replication_de->d_name, drop_nonfailover_slots);
 	}
 	FreeDir(replication_dir);
 
@@ -926,8 +1070,8 @@ StartupReplicationSlots(void)
 		return;
 
 	/* Now that we have recovered all the data, compute replication xmin */
-	ReplicationSlotsComputeRequiredXmin(false);
-	ReplicationSlotsComputeRequiredLSN();
+	ReplicationSlotsUpdateRequiredXmin(false);
+	ReplicationSlotsUpdateRequiredLSN();
 }
 
 /* ----
@@ -996,6 +1140,8 @@ CreateSlotOnDisk(ReplicationSlot *slot)
 
 /*
  * Shared functionality between saving and creating a replication slot.
+ *
+ * For failover slots this is where we emit xlog.
  */
 static void
 SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
@@ -1006,15 +1152,18 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
 	ReplicationSlotOnDisk cp;
 	bool		was_dirty;
 
-	/* first check whether there's something to write out */
-	SpinLockAcquire(&slot->mutex);
-	was_dirty = slot->dirty;
-	slot->just_dirtied = false;
-	SpinLockRelease(&slot->mutex);
+	if (!RecoveryInProgress())
+	{
+		/* first check whether there's something to write out */
+		SpinLockAcquire(&slot->mutex);
+		was_dirty = slot->dirty;
+		slot->just_dirtied = false;
+		SpinLockRelease(&slot->mutex);
 
-	/* and don't do anything if there's nothing to write */
-	if (!was_dirty)
-		return;
+		/* and don't do anything if there's nothing to write */
+		if (!was_dirty)
+			return;
+	}
 
 	LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
 
@@ -1047,6 +1196,25 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
 
 	SpinLockRelease(&slot->mutex);
 
+	/*
+	 * If needed, record this action in WAL
+	 */
+	if (slot->data.failover &&
+		slot->data.persistency == RS_PERSISTENT &&
+		!RecoveryInProgress())
+	{
+		XLogBeginInsert();
+		XLogRegisterData((char *) (&cp.slotdata), sizeof(ReplicationSlotPersistentData));
+		/*
+		 * Note that slot creation on the downstream is also an "update".
+		 *
+		 * Slots can start off ephemeral and be updated to persistent. We just
+		 * log the update and the downstream creates the new slot if it doesn't
+		 * exist yet.
+		 */
+		(void) XLogInsert(RM_REPLSLOT_ID, XLOG_REPLSLOT_UPDATE);
+	}
+
 	COMP_CRC32C(cp.checksum,
 				(char *) (&cp) + SnapBuildOnDiskNotChecksummedSize,
 				SnapBuildOnDiskChecksummedSize);
@@ -1116,7 +1284,7 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
  * Load a single slot from disk into memory.
  */
 static void
-RestoreSlotFromDisk(const char *name)
+RestoreSlotFromDisk(const char *name, bool drop_nonfailover_slots)
 {
 	ReplicationSlotOnDisk cp;
 	int			i;
@@ -1235,10 +1403,21 @@ RestoreSlotFromDisk(const char *name)
 						path, checksum, cp.checksum)));
 
 	/*
-	 * If we crashed with an ephemeral slot active, don't restore but delete
-	 * it.
+	 * If we crashed with an ephemeral slot active, don't restore but
+	 * delete it.
+	 *
+	 * Similarly, if we're in archive recovery and will be running as
+	 * a standby (when drop_nonfailover_slots is set), non-failover
+	 * slots can't be relied upon. Logical slots might have a catalog
+	 * xmin lower than reality because the original slot on the master
+	 * advanced past the point the stale slot on the replica is stuck
+	 * at. Additionally slots might have been copied while being
+	 * written to if the basebackup copy method was not atomic.
+	 * Failover slots are safe since they're WAL-logged and follow the
+	 * master's slot position.
 	 */
-	if (cp.slotdata.persistency != RS_PERSISTENT)
+	if (cp.slotdata.persistency != RS_PERSISTENT
+			|| (drop_nonfailover_slots && !cp.slotdata.failover))
 	{
 		sprintf(path, "pg_replslot/%s", name);
 
@@ -1249,6 +1428,14 @@ RestoreSlotFromDisk(const char *name)
 					 errmsg("could not remove directory \"%s\"", path)));
 		}
 		fsync_fname("pg_replslot", true);
+
+		if (cp.slotdata.persistency == RS_PERSISTENT)
+		{
+			ereport(LOG,
+					(errmsg("dropped non-failover slot %s during archive recovery",
+							 NameStr(cp.slotdata.name))));
+		}
+
 		return;
 	}
 
@@ -1285,5 +1472,332 @@ RestoreSlotFromDisk(const char *name)
 	if (!restored)
 		ereport(PANIC,
 				(errmsg("too many replication slots active before shutdown"),
-				 errhint("Increase max_replication_slots and try again.")));
+				 errhint("Increase max_replication_slots (currently %u) and try again.",
+					 max_replication_slots)));
+}
+
+/*
+ * This usually just writes new persistent data to the slot state, but an
+ * update record might create a new slot on the downstream if we changed a
+ * previously ephemeral slot to persistent. We have to decide which
+ * by looking for the existing slot.
+ */
+static void
+ReplicationSlotRedoCreateOrUpdate(ReplicationSlotInWAL xlrec)
+{
+	ReplicationSlot *slot;
+	bool	found_available = false;
+	bool	found_duplicate = false;
+	int		use_slotid = 0;
+	int		i;
+
+	/*
+	 * We're in redo, but someone could still create a local
+	 * non-failover slot and race with us unless we take the
+	 * allocation lock.
+	 */
+	LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
+
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		slot = &ReplicationSlotCtl->replication_slots[i];
+
+		/*
+		 * Find first unused position in the slots array, but keep on
+		 * scanning in case there's an existing slot with the same
+		 * name.
+		 */
+		if (!slot->in_use && !found_available)
+		{
+			use_slotid = i;
+			found_available = true;
+		}
+
+		/*
+		 * Existing slot with same name? It could be our failover slot
+		 * to update or a non-failover slot with a conflicting name.
+		 */
+		if (strcmp(NameStr(xlrec->name), NameStr(slot->data.name)) == 0)
+		{
+			use_slotid = i;
+			found_available = true;
+			found_duplicate = true;
+			break;
+		}
+	}
+
+	if (found_duplicate && !slot->data.failover)
+	{
+		/*
+		 * A local non-failover slot exists with the same name as
+		 * the failover slot we're creating.
+		 *
+		 * Clobber the client, drop its slot, and carry on with
+		 * our business.
+		 *
+		 * First we must temporarily release the allocation lock while
+		 * we try to terminate the process that holds the slot, since
+		 * we don't want to hold the LWlock for ages. We'll reacquire
+		 * it later.
+		 */
+		LWLockRelease(ReplicationSlotAllocationLock);
+
+		/* We might race with other clients, so retry-loop */
+		do
+		{
+			int active_pid = slot->active_pid;
+			int max_sleep_millis = 120 * 1000;
+			int millis_per_sleep = 1000;
+
+			if (active_pid != 0)
+			{
+				ereport(INFO,
+						(errmsg("terminating active connection by pid %u to local slot %s because of conflict with recovery",
+							active_pid, NameStr(slot->data.name))));
+
+				if (kill(active_pid, SIGTERM))
+					elog(DEBUG1, "failed to signal pid %u to terminate on slot conflict: %m",
+							active_pid);
+
+				/*
+				 * Wait for the process using the slot to die. This just uses the
+				 * latch to poll; the process won't set our latch when it releases
+				 * the slot and dies.
+				 *
+				 * We're checking active_pid without any locks held, but we'll
+				 * recheck when we attempt to drop the slot.
+				 */
+				while (slot->in_use && slot->active_pid == active_pid
+						&& max_sleep_millis > 0)
+				{
+					int rc;
+
+					rc = WaitLatch(MyLatch,
+							WL_TIMEOUT | WL_LATCH_SET | WL_POSTMASTER_DEATH,
+							millis_per_sleep);
+
+					if (rc & WL_POSTMASTER_DEATH)
+						elog(FATAL, "exiting after postmaster termination");
+
+					/*
+					 * Might be shorter if something sets our latch, but
+					 * we don't care much.
+					 */
+					max_sleep_millis -= millis_per_sleep;
+				}
+
+				if (max_sleep_millis <= 0)
+					elog(WARNING, "process %u is taking too long to terminate after SIGTERM",
+							slot->active_pid);
+			}
+
+			if (slot->active_pid == 0)
+			{
+				/* Try to acquire and drop the slot */
+				SpinLockAcquire(&slot->mutex);
+
+				if (slot->active_pid != 0)
+				{
+					/* Lost the race, go around */
+				}
+				else
+				{
+					/* Claim the slot for ourselves */
+					slot->active_pid = MyProcPid;
+					MyReplicationSlot = slot;
+				}
+				SpinLockRelease(&slot->mutex);
+			}
+
+			if (slot->active_pid == MyProcPid)
+			{
+				NameData slotname;
+				strncpy(NameStr(slotname), NameStr(slot->data.name), NAMEDATALEN);
+				(NameStr(slotname))[NAMEDATALEN-1] = '\0';
+
+				/*
+				 * Reclaim the allocation lock and THEN drop the slot,
+				 * so nobody else can grab the name until we've
+				 * finished redo.
+				 */
+				LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
+				ReplicationSlotDropAcquired();
+				/* We clobbered the duplicate, treat it as new */
+				found_duplicate = false;
+
+				ereport(WARNING,
+						(errmsg("dropped local replication slot %s because of conflict with recovery",
+								NameStr(slotname)),
+						 errdetail("A failover slot with the same name was created on the master server")));
+			}
+		}
+		while (slot->in_use);
+	}
+
+	Assert(LWLockHeldByMe(ReplicationSlotAllocationLock));
+
+	/*
+	 * This is either an empty slot control position to make a new slot or it's
+	 * an existing entry for this failover slot that we need to update.
+	 */
+	if (found_available)
+	{
+		LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
+
+		slot = &ReplicationSlotCtl->replication_slots[use_slotid];
+
+		/* restore the entire set of persistent data */
+		memcpy(&slot->data, xlrec,
+			   sizeof(ReplicationSlotPersistentData));
+
+		Assert(strcmp(NameStr(xlrec->name), NameStr(slot->data.name)) == 0);
+		Assert(slot->data.failover && slot->data.persistency == RS_PERSISTENT);
+
+		/* Update the non-persistent in-memory state */
+		slot->effective_xmin = xlrec->xmin;
+		slot->effective_catalog_xmin = xlrec->catalog_xmin;
+
+		if (found_duplicate)
+		{
+			char		path[MAXPGPATH];
+
+			/* Write an existing slot to disk */
+			Assert(slot->in_use);
+			Assert(slot->active_pid == 0); /* can't be replaying from failover slot */
+
+			sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
+			slot->dirty = true;
+			SaveSlotToPath(slot, path, ERROR);
+		}
+		else
+		{
+			Assert(!slot->in_use);
+			/* In-memory state that's only set on create, not update */
+			slot->active_pid = 0;
+			slot->in_use = true;
+			slot->candidate_catalog_xmin = InvalidTransactionId;
+			slot->candidate_xmin_lsn = InvalidXLogRecPtr;
+			slot->candidate_restart_lsn = InvalidXLogRecPtr;
+			slot->candidate_restart_valid = InvalidXLogRecPtr;
+
+			CreateSlotOnDisk(slot);
+		}
+
+		LWLockRelease(ReplicationSlotControlLock);
+
+		ReplicationSlotsUpdateRequiredXmin(false);
+		ReplicationSlotsUpdateRequiredLSN();
+	}
+
+	LWLockRelease(ReplicationSlotAllocationLock);
+
+	if (!found_available)
+	{
+		/*
+		 * Because the standby should have the same or greater max_replication_slots
+		 * as the master this shouldn't happen, but just in case...
+		 */
+		ereport(ERROR,
+				(errmsg("max_replication_slots exceeded, cannot replay failover slot creation"),
+				 errhint("Increase max_replication_slots")));
+	}
+}
+
+/*
+ * Redo a slot drop of a failover slot. This might be a redo during crash
+ * recovery on the master or it may be replay on a standby.
+ */
+static void
+ReplicationSlotRedoDrop(const char * slotname)
+{
+	/*
+	 * Acquire the failover slot that's to be dropped.
+	 *
+	 * We can't ReplicationSlotAcquire here because we want to acquire
+	 * a replication slot during replay, which isn't usually allowed.
+	 * Also, because we might crash midway through a drop we can't
+	 * assume we'll actually find the slot so it's not an error for
+	 * the slot to be missing.
+	 */
+	int			i;
+
+	Assert(MyReplicationSlot == NULL);
+
+	ReplicationSlotValidateName(slotname, ERROR);
+
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+		if (s->in_use && strcmp(slotname, NameStr(s->data.name)) == 0)
+		{
+			if (!s->data.persistency == RS_PERSISTENT)
+			{
+				/* shouldn't happen */
+				elog(WARNING, "found conflicting non-persistent slot during failover slot drop");
+				break;
+			}
+
+			if (!s->data.failover)
+			{
+				/* shouldn't happen */
+				elog(WARNING, "found non-failover slot during redo of slot drop");
+				break;
+			}
+
+			/* A failover slot can't be active during recovery */
+			Assert(s->active_pid == 0);
+
+			/* Claim the slot */
+			s->active_pid = MyProcPid;
+			MyReplicationSlot = s;
+
+			break;
+		}
+	}
+	LWLockRelease(ReplicationSlotControlLock);
+
+	if (MyReplicationSlot != NULL)
+	{
+		ReplicationSlotDropAcquired();
+	}
+	else
+	{
+		elog(WARNING, "failover slot %s not found during redo of drop",
+				slotname);
+	}
+}
+
+void
+replslot_redo(XLogReaderState *record)
+{
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	switch (info)
+	{
+		/*
+		 * Update the values for an existing failover slot or, when a slot
+		 * is first logged as persistent, create it on the downstream.
+		 */
+		case XLOG_REPLSLOT_UPDATE:
+			ReplicationSlotRedoCreateOrUpdate((ReplicationSlotInWAL) XLogRecGetData(record));
+			break;
+
+		/*
+		 * Drop an existing failover slot.
+		 */
+		case XLOG_REPLSLOT_DROP:
+			{
+				xl_replslot_drop *xlrec =
+				(xl_replslot_drop *) XLogRecGetData(record);
+
+				ReplicationSlotRedoDrop(NameStr(xlrec->name));
+
+				break;
+			}
+
+		default:
+			elog(PANIC, "replslot_redo: unknown op code %u", info);
+	}
 }
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 9cc24ea..f430714 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -57,7 +57,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
 	CheckSlotRequirements();
 
 	/* acquire replication slot, this will check for conflicting names */
-	ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT);
+	ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT, false);
 
 	values[0] = NameGetDatum(&MyReplicationSlot->data.name);
 	nulls[0] = false;
@@ -120,7 +120,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	 * errors during initialization because it'll get dropped if this
 	 * transaction fails. We'll make it persistent at the end.
 	 */
-	ReplicationSlotCreate(NameStr(*name), true, RS_EPHEMERAL);
+	ReplicationSlotCreate(NameStr(*name), true, RS_EPHEMERAL, false);
 
 	/*
 	 * Create logical decoding context, to build the initial snapshot.
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index c03e045..1583862 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -792,7 +792,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
 	if (cmd->kind == REPLICATION_KIND_PHYSICAL)
 	{
-		ReplicationSlotCreate(cmd->slotname, false, RS_PERSISTENT);
+		ReplicationSlotCreate(cmd->slotname, false, RS_PERSISTENT, false);
 	}
 	else
 	{
@@ -803,7 +803,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 		 * handle errors during initialization because it'll get dropped if
 		 * this transaction fails. We'll make it persistent at the end.
 		 */
-		ReplicationSlotCreate(cmd->slotname, true, RS_EPHEMERAL);
+		ReplicationSlotCreate(cmd->slotname, true, RS_EPHEMERAL, false);
 	}
 
 	initStringInfo(&output_message);
@@ -1523,7 +1523,7 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
 	if (changed)
 	{
 		ReplicationSlotMarkDirty();
-		ReplicationSlotsComputeRequiredLSN();
+		ReplicationSlotsUpdateRequiredLSN();
 	}
 
 	/*
@@ -1619,7 +1619,7 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
 	if (changed)
 	{
 		ReplicationSlotMarkDirty();
-		ReplicationSlotsComputeRequiredXmin(false);
+		ReplicationSlotsUpdateRequiredXmin(false);
 	}
 }
 
diff --git a/src/bin/pg_xlogdump/replslotdesc.c b/src/bin/pg_xlogdump/replslotdesc.c
new file mode 120000
index 0000000..2e088d2
--- /dev/null
+++ b/src/bin/pg_xlogdump/replslotdesc.c
@@ -0,0 +1 @@
+../../../src/backend/access/rmgrdesc/replslotdesc.c
\ No newline at end of file
diff --git a/src/bin/pg_xlogdump/rmgrdesc.c b/src/bin/pg_xlogdump/rmgrdesc.c
index f9cd395..73ed7d4 100644
--- a/src/bin/pg_xlogdump/rmgrdesc.c
+++ b/src/bin/pg_xlogdump/rmgrdesc.c
@@ -26,6 +26,7 @@
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
 #include "replication/origin.h"
+#include "replication/slot_xlog.h"
 #include "rmgrdesc.h"
 #include "storage/standbydefs.h"
 #include "utils/relmapper.h"
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index fab912d..124b7e5 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -45,3 +45,4 @@ PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_start
 PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL)
 PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL)
 PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL)
+PG_RMGR(RM_REPLSLOT_ID, "ReplicationSlot", replslot_redo, replslot_desc, replslot_identify, NULL, NULL)
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 8be8ab6..cdcbd37 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -11,69 +11,12 @@
 
 #include "fmgr.h"
 #include "access/xlog.h"
-#include "access/xlogreader.h"
+#include "replication/slot_xlog.h"
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
 #include "storage/spin.h"
 
 /*
- * Behaviour of replication slots, upon release or crash.
- *
- * Slots marked as PERSISTENT are crashsafe and will not be dropped when
- * released. Slots marked as EPHEMERAL will be dropped when released or after
- * restarts.
- *
- * EPHEMERAL slots can be made PERSISTENT by calling ReplicationSlotPersist().
- */
-typedef enum ReplicationSlotPersistency
-{
-	RS_PERSISTENT,
-	RS_EPHEMERAL
-} ReplicationSlotPersistency;
-
-/*
- * On-Disk data of a replication slot, preserved across restarts.
- */
-typedef struct ReplicationSlotPersistentData
-{
-	/* The slot's identifier */
-	NameData	name;
-
-	/* database the slot is active on */
-	Oid			database;
-
-	/*
-	 * The slot's behaviour when being dropped (or restored after a crash).
-	 */
-	ReplicationSlotPersistency persistency;
-
-	/*
-	 * xmin horizon for data
-	 *
-	 * NB: This may represent a value that hasn't been written to disk yet;
-	 * see notes for effective_xmin, below.
-	 */
-	TransactionId xmin;
-
-	/*
-	 * xmin horizon for catalog tuples
-	 *
-	 * NB: This may represent a value that hasn't been written to disk yet;
-	 * see notes for effective_xmin, below.
-	 */
-	TransactionId catalog_xmin;
-
-	/* oldest LSN that might be required by this replication slot */
-	XLogRecPtr	restart_lsn;
-
-	/* oldest LSN that the client has acked receipt for */
-	XLogRecPtr	confirmed_flush;
-
-	/* plugin name */
-	NameData	plugin;
-} ReplicationSlotPersistentData;
-
-/*
  * Shared memory state of a single replication slot.
  */
 typedef struct ReplicationSlot
@@ -155,7 +98,7 @@ extern void ReplicationSlotsShmemInit(void);
 
 /* management of individual slots */
 extern void ReplicationSlotCreate(const char *name, bool db_specific,
-					  ReplicationSlotPersistency p);
+					  ReplicationSlotPersistency p, bool failover);
 extern void ReplicationSlotPersist(void);
 extern void ReplicationSlotDrop(const char *name);
 
@@ -167,12 +110,14 @@ extern void ReplicationSlotMarkDirty(void);
 /* misc stuff */
 extern bool ReplicationSlotValidateName(const char *name, int elevel);
 extern void ReplicationSlotReserveWal(void);
-extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
-extern void ReplicationSlotsComputeRequiredLSN(void);
+extern void ReplicationSlotsUpdateRequiredXmin(bool already_locked);
+extern void ReplicationSlotsUpdateRequiredLSN(void);
 extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
+extern XLogRecPtr ReplicationSlotsComputeRequiredLSN(bool failover_only);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
+extern void ReplicationSlotsDropDBSlots(Oid dboid);
 
-extern void StartupReplicationSlots(void);
+extern void StartupReplicationSlots(bool drop_nonfailover_slots);
 extern void CheckPointReplicationSlots(void);
 
 extern void CheckSlotRequirements(void);
diff --git a/src/include/replication/slot_xlog.h b/src/include/replication/slot_xlog.h
new file mode 100644
index 0000000..e3211f5
--- /dev/null
+++ b/src/include/replication/slot_xlog.h
@@ -0,0 +1,100 @@
+/*-------------------------------------------------------------------------
+ * slot_xlog.h
+ *	   Replication slot management.
+ *
+ * Copyright (c) 2012-2015, PostgreSQL Global Development Group
+ *
+ * src/include/replication/slot_xlog.h
+ *-------------------------------------------------------------------------
+ */
+#ifndef SLOT_XLOG_H
+#define SLOT_XLOG_H
+
+#include "fmgr.h"
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "access/xlogreader.h"
+
+/*
+ * Behaviour of replication slots, upon release or crash.
+ *
+ * Slots marked as PERSISTENT are crashsafe and will not be dropped when
+ * released. Slots marked as EPHEMERAL will be dropped when released or after
+ * restarts.
+ *
+ * EPHEMERAL slots can be made PERSISTENT by calling ReplicationSlotPersist().
+ */
+typedef enum ReplicationSlotPersistency
+{
+	RS_PERSISTENT,
+	RS_EPHEMERAL
+} ReplicationSlotPersistency;
+
+/*
+ * On-Disk data of a replication slot, preserved across restarts.
+ */
+typedef struct ReplicationSlotPersistentData
+{
+	/* The slot's identifier */
+	NameData	name;
+
+	/* database the slot is active on */
+	Oid			database;
+
+	/*
+	 * The slot's behaviour when being dropped (or restored after a crash).
+	 */
+	ReplicationSlotPersistency persistency;
+
+	/*
+	 * Slots created on master become failover-slots and are maintained
+	 * on all standbys, but are only assignable after failover.
+	 */
+	bool		failover;
+
+	/*
+	 * xmin horizon for data
+	 *
+	 * NB: This may represent a value that hasn't been written to disk yet;
+	 * see notes for effective_xmin, below.
+	 */
+	TransactionId xmin;
+
+	/*
+	 * xmin horizon for catalog tuples
+	 *
+	 * NB: This may represent a value that hasn't been written to disk yet;
+	 * see notes for effective_xmin, below.
+	 */
+	TransactionId catalog_xmin;
+
+	/* oldest LSN that might be required by this replication slot */
+	XLogRecPtr	restart_lsn;
+	TimeLineID	restart_tli;
+
+	/* oldest LSN that the client has acked receipt for */
+	XLogRecPtr	confirmed_flush;
+
+	/* plugin name */
+	NameData	plugin;
+} ReplicationSlotPersistentData;
+
+typedef ReplicationSlotPersistentData *ReplicationSlotInWAL;
+
+/*
+ * WAL records for failover slots
+ */
+#define XLOG_REPLSLOT_UPDATE	0x10
+#define XLOG_REPLSLOT_DROP		0x20
+
+typedef struct xl_replslot_drop
+{
+	NameData	name;
+} xl_replslot_drop;
+
+/* WAL logging */
+extern void replslot_redo(XLogReaderState *record);
+extern void replslot_desc(StringInfo buf, XLogReaderState *record);
+extern const char *replslot_identify(uint8 info);
+
+#endif   /* SLOT_XLOG_H */
diff --git a/src/test/modules/decoding_failover/decoding_failover.c b/src/test/modules/decoding_failover/decoding_failover.c
index bab0f3b..8fcfda5 100644
--- a/src/test/modules/decoding_failover/decoding_failover.c
+++ b/src/test/modules/decoding_failover/decoding_failover.c
@@ -37,7 +37,7 @@ decoding_failover_create_logical_slot(PG_FUNCTION_ARGS)
 
 	CheckSlotRequirements();
 
-	ReplicationSlotCreate(slotname, true, RS_PERSISTENT);
+	ReplicationSlotCreate(slotname, true, RS_PERSISTENT, false);
 
 	/* register the plugin name with the slot */
 	StrNCpy(NameStr(MyReplicationSlot->data.plugin), plugin, NAMEDATALEN);
@@ -99,8 +99,8 @@ decoding_failover_advance_logical_slot(PG_FUNCTION_ARGS)
 	ReplicationSlotSave();
 	ReplicationSlotRelease();
 
-	ReplicationSlotsComputeRequiredXmin(false);
-	ReplicationSlotsComputeRequiredLSN();
+	ReplicationSlotsUpdateRequiredXmin(false);
+	ReplicationSlotsUpdateRequiredLSN();
 
 	PG_RETURN_VOID();
 }
-- 
2.1.0

From eef34447d9b69c32cd6da5116f24cd628370d4a9 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Tue, 8 Mar 2016 14:34:36 +0800
Subject: [PATCH 2/7] Update decoding_failover tests for failover slots

---
 .../recovery/t/006_logical_decoding_timelines.pl   | 29 +++++++++-------------
 1 file changed, 12 insertions(+), 17 deletions(-)

diff --git a/src/test/recovery/t/006_logical_decoding_timelines.pl b/src/test/recovery/t/006_logical_decoding_timelines.pl
index 1372d90..ed6cac7 100644
--- a/src/test/recovery/t/006_logical_decoding_timelines.pl
+++ b/src/test/recovery/t/006_logical_decoding_timelines.pl
@@ -19,7 +19,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 20;
+use Test::More tests => 19;
 use RecursiveCopy;
 use File::Copy;
 
@@ -64,7 +64,7 @@ $node_master->safe_psql('postgres', 'CHECKPOINT;');
 
 # Verify that only the before base_backup slot is on the replica
 $stdout = $node_replica->safe_psql('postgres', 'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name');
-is($stdout, 'before_basebackup', 'Expected to find only slot before_basebackup on replica');
+is($stdout, '', 'Expected to find no slots on replica');
 
 # Boom, crash
 $node_master->stop('immediate');
@@ -86,22 +86,16 @@ like(
 	qr/replication slot "after_basebackup" does not exist/,
 	'after_basebackup slot missing');
 
-# Should be able to read from slot created before base backup
+# or before_basebackup, since pg_basebackup dropped it
 ($ret, $stdout, $stderr) = $node_replica->psql(
 	'postgres',
 "SELECT data FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');",
 	timeout => 30);
-is($ret, 0, 'replay from slot before_basebackup succeeds');
-is( $stdout, q(BEGIN
-table public.decoding: INSERT: blah[text]:'beforebb'
-COMMIT
-BEGIN
-table public.decoding: INSERT: blah[text]:'afterbb'
-COMMIT
-BEGIN
-table public.decoding: INSERT: blah[text]:'after failover'
-COMMIT), 'decoded expected data from slot before_basebackup');
-is($stderr, '', 'replay from slot before_basebackup produces no stderr');
+is($ret, 3, 'replaying from beforebasebackup slot fails');
+like(
+	$stderr,
+	qr/replication slot "before_basebackup" does not exist/,
+	'before_basebackup slot missing');
 
 # We don't need the standby anymore
 $node_replica->teardown_node();
@@ -121,9 +115,10 @@ is($node_master->psql('postgres', 'SELECT pg_drop_replication_slot(slot_name) FR
   0, 'dropping slots succeeds via pg_drop_replication_slot');
 
 # Same as before, we'll make one slot before basebackup, one after. This time
-# the basebackup will be with pg_basebackup so it'll omit both slots, then
-# we'll use SQL functions provided by the decoding_failover test module to
-# sync them to the replica, do some work, sync them and fail over then test
+# the basebackup will be with pg_basebackup. It'll copy the before_basebackup slot
+# but since it's a non-failover slot the server will drop it immediately.
+# We'll use SQL functions provided by the decoding_failover test module to
+# sync both slots to the replica, do some work, sync them and fail over then test
 # again. This time we should have both the before- and after-basebackup
 # slots working.
 
-- 
2.1.0

From 73b9e5827f6e590e5c558f36ce0962f3bdecd2ad Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Tue, 23 Feb 2016 16:00:09 +0800
Subject: [PATCH 3/7] Retain extra WAL for failover slots in base backups

Change the return value of pg_start_backup(), the BASE_BACKUP walsender
command, etc to report the minimum WAL required by any failover slot if
this is a lower LSN than the redo position so that base backups contain
the WAL required for slots to work.

Add a new backup label entry 'MIN FAILOVER SLOT LSN' that, if present,
indicates the minimum LSN needed by any failover slot that is present in
the base backup. Backup tools should check for this entry and ensure
they retain all xlogs including and after that point.
---
 src/backend/access/transam/xlog.c | 41 +++++++++++++++++++++++++++++++++++++++
 1 file changed, 41 insertions(+)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index a92f09d..9018af5 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -9797,6 +9797,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
 	bool		backup_started_in_recovery = false;
 	XLogRecPtr	checkpointloc;
 	XLogRecPtr	startpoint;
+	XLogRecPtr  slot_startpoint;
 	TimeLineID	starttli;
 	pg_time_t	stamp_time;
 	char		strfbuf[128];
@@ -9943,6 +9944,17 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
 			checkpointfpw = ControlFile->checkPointCopy.fullPageWrites;
 			LWLockRelease(ControlFileLock);
 
+			/*
+			 * If failover slots are in use we must retain and transfer WAL
+			 * older than the redo location so that those slots can be replayed
+			 * from after a failover event.
+			 *
+			 * This MUST be at an xlog segment boundary so truncate the LSN
+			 * appropriately.
+			 */
+			if (max_replication_slots > 0)
+				slot_startpoint = (ReplicationSlotsComputeRequiredLSN(true)/ XLOG_SEG_SIZE) * XLOG_SEG_SIZE;
+
 			if (backup_started_in_recovery)
 			{
 				XLogRecPtr	recptr;
@@ -10111,6 +10123,10 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
 						 backup_started_in_recovery ? "standby" : "master");
 		appendStringInfo(&labelfbuf, "START TIME: %s\n", strfbuf);
 		appendStringInfo(&labelfbuf, "LABEL: %s\n", backupidstr);
+		if (slot_startpoint != InvalidXLogRecPtr)
+			appendStringInfo(&labelfbuf,  "MIN FAILOVER SLOT LSN: %X/%X\n",
+						(uint32)(slot_startpoint>>32), (uint32)slot_startpoint);
+
 
 		/*
 		 * Okay, write the file, or return its contents to caller.
@@ -10204,9 +10220,34 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
 
 	/*
 	 * We're done.  As a convenience, return the starting WAL location.
+	 *
+	 * pg_basebackup etc expect to use this as the position to start copying
+	 * WAL from, so we should return the minimum of the slot start LSN and the
+	 * current redo position to make sure we get all WAL required by failover
+	 * slots.
+	 *
+	 * The min required LSN for failover slots is also available from the
+	 * 'MIN FAILOVER SLOT LSN' entry in the backup label file.
 	 */
+	if (slot_startpoint != InvalidXLogRecPtr && slot_startpoint < startpoint)
+	{
+		List *history;
+		TimeLineID slot_start_tli;
+
+		/* Min LSN required by a slot may be on an older timeline. */
+		history = readTimeLineHistory(ThisTimeLineID);
+		slot_start_tli = tliOfPointInHistory(slot_startpoint, history);
+		list_free_deep(history);
+
+		if (slot_start_tli < starttli)
+			starttli = slot_start_tli;
+
+		startpoint = slot_startpoint;
+	}
+
 	if (starttli_p)
 		*starttli_p = starttli;
+
 	return startpoint;
 }
 
-- 
2.1.0

From 52f07aa03ebde429cf3dccbe21bc6fa8e59eacc2 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Tue, 23 Feb 2016 16:04:05 +0800
Subject: [PATCH 4/7] Add the UI and for failover slots

Expose failover slots to the user.

Add a new 'failover' argument to pg_create_logical_replication_slot and
pg_create_physical_replication_slot . Accept a new FAILOVER keyword
argument in CREATE_REPLICATION_SLOT on the walsender protocol.
---
 contrib/test_decoding/expected/ddl.out |  3 +++
 contrib/test_decoding/sql/ddl.sql      |  2 ++
 src/backend/catalog/system_views.sql   | 11 ++++++++++-
 src/backend/replication/repl_gram.y    | 13 +++++++++++--
 src/backend/replication/repl_scanner.l |  1 +
 src/backend/replication/slotfuncs.c    |  7 +++++--
 src/backend/replication/walsender.c    |  4 ++--
 src/include/catalog/pg_proc.h          |  4 ++--
 src/include/nodes/replnodes.h          |  1 +
 src/include/replication/slot.h         |  1 +
 10 files changed, 38 insertions(+), 9 deletions(-)

diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out
index 57a1289..5fed500 100644
--- a/contrib/test_decoding/expected/ddl.out
+++ b/contrib/test_decoding/expected/ddl.out
@@ -9,6 +9,9 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
 -- fail because of an already existing slot
 SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
 ERROR:  replication slot "regression_slot" already exists
+-- fail because a failover slot can't replace a normal slot on the master
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', true);
+ERROR:  replication slot "regression_slot" already exists
 -- fail because of an invalid name
 SELECT 'init' FROM pg_create_logical_replication_slot('Invalid Name', 'test_decoding');
 ERROR:  replication slot name "Invalid Name" contains invalid character
diff --git a/contrib/test_decoding/sql/ddl.sql b/contrib/test_decoding/sql/ddl.sql
index e311c59..dc61ef4 100644
--- a/contrib/test_decoding/sql/ddl.sql
+++ b/contrib/test_decoding/sql/ddl.sql
@@ -4,6 +4,8 @@ SET synchronous_commit = on;
 SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
 -- fail because of an already existing slot
 SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+-- fail because a failover slot can't replace a normal slot on the master
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', true);
 -- fail because of an invalid name
 SELECT 'init' FROM pg_create_logical_replication_slot('Invalid Name', 'test_decoding');
 
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index abf9a70..fcb877d 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -949,12 +949,21 @@ AS 'pg_logical_slot_peek_binary_changes';
 
 CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot(
     IN slot_name name, IN immediately_reserve boolean DEFAULT false,
-    OUT slot_name name, OUT xlog_position pg_lsn)
+    IN failover boolean DEFAULT false, OUT slot_name name,
+    OUT xlog_position pg_lsn)
 RETURNS RECORD
 LANGUAGE INTERNAL
 STRICT VOLATILE
 AS 'pg_create_physical_replication_slot';
 
+CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot(
+    IN slot_name name, IN plugin name, IN failover boolean DEFAULT false,
+    OUT slot_name text, OUT xlog_position pg_lsn)
+RETURNS RECORD
+LANGUAGE INTERNAL
+STRICT VOLATILE
+AS 'pg_create_logical_replication_slot';
+
 CREATE OR REPLACE FUNCTION
   make_interval(years int4 DEFAULT 0, months int4 DEFAULT 0, weeks int4 DEFAULT 0,
                 days int4 DEFAULT 0, hours int4 DEFAULT 0, mins int4 DEFAULT 0,
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index d93db88..1574f24 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -77,6 +77,7 @@ Node *replication_parse_result;
 %token K_LOGICAL
 %token K_SLOT
 %token K_RESERVE_WAL
+%token K_FAILOVER
 
 %type <node>	command
 %type <node>	base_backup start_replication start_logical_replication
@@ -90,6 +91,7 @@ Node *replication_parse_result;
 %type <node>	plugin_opt_arg
 %type <str>		opt_slot
 %type <boolval>	opt_reserve_wal
+%type <boolval> opt_failover
 
 %%
 
@@ -184,23 +186,25 @@ base_backup_opt:
 
 create_replication_slot:
 			/* CREATE_REPLICATION_SLOT slot PHYSICAL RESERVE_WAL */
-			K_CREATE_REPLICATION_SLOT IDENT K_PHYSICAL opt_reserve_wal
+			K_CREATE_REPLICATION_SLOT IDENT K_PHYSICAL opt_reserve_wal opt_failover
 				{
 					CreateReplicationSlotCmd *cmd;
 					cmd = makeNode(CreateReplicationSlotCmd);
 					cmd->kind = REPLICATION_KIND_PHYSICAL;
 					cmd->slotname = $2;
 					cmd->reserve_wal = $4;
+					cmd->failover = $5;
 					$$ = (Node *) cmd;
 				}
 			/* CREATE_REPLICATION_SLOT slot LOGICAL plugin */
-			| K_CREATE_REPLICATION_SLOT IDENT K_LOGICAL IDENT
+			| K_CREATE_REPLICATION_SLOT IDENT K_LOGICAL IDENT opt_failover
 				{
 					CreateReplicationSlotCmd *cmd;
 					cmd = makeNode(CreateReplicationSlotCmd);
 					cmd->kind = REPLICATION_KIND_LOGICAL;
 					cmd->slotname = $2;
 					cmd->plugin = $4;
+					cmd->failover = $5;
 					$$ = (Node *) cmd;
 				}
 			;
@@ -276,6 +280,11 @@ opt_reserve_wal:
 			| /* EMPTY */					{ $$ = false; }
 			;
 
+opt_failover:
+			K_FAILOVER						{ $$ = true; }
+			| /* EMPTY */					{ $$ = false; }
+			;
+
 opt_slot:
 			K_SLOT IDENT
 				{ $$ = $2; }
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index f83ec53..a1d9f10 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -98,6 +98,7 @@ PHYSICAL			{ return K_PHYSICAL; }
 RESERVE_WAL			{ return K_RESERVE_WAL; }
 LOGICAL				{ return K_LOGICAL; }
 SLOT				{ return K_SLOT; }
+FAILOVER			{ return K_FAILOVER; }
 
 ","				{ return ','; }
 ";"				{ return ';'; }
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index f430714..a2dfc40 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -18,6 +18,7 @@
 
 #include "access/htup_details.h"
 #include "replication/slot.h"
+#include "replication/slot_xlog.h"
 #include "replication/logical.h"
 #include "replication/logicalfuncs.h"
 #include "utils/builtins.h"
@@ -41,6 +42,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
 {
 	Name		name = PG_GETARG_NAME(0);
 	bool 		immediately_reserve = PG_GETARG_BOOL(1);
+	bool		failover = PG_GETARG_BOOL(2);
 	Datum		values[2];
 	bool		nulls[2];
 	TupleDesc	tupdesc;
@@ -57,7 +59,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
 	CheckSlotRequirements();
 
 	/* acquire replication slot, this will check for conflicting names */
-	ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT, false);
+	ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT, failover);
 
 	values[0] = NameGetDatum(&MyReplicationSlot->data.name);
 	nulls[0] = false;
@@ -96,6 +98,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 {
 	Name		name = PG_GETARG_NAME(0);
 	Name		plugin = PG_GETARG_NAME(1);
+	bool		failover = PG_GETARG_BOOL(2);
 
 	LogicalDecodingContext *ctx = NULL;
 
@@ -120,7 +123,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	 * errors during initialization because it'll get dropped if this
 	 * transaction fails. We'll make it persistent at the end.
 	 */
-	ReplicationSlotCreate(NameStr(*name), true, RS_EPHEMERAL, false);
+	ReplicationSlotCreate(NameStr(*name), true, RS_EPHEMERAL, failover);
 
 	/*
 	 * Create logical decoding context, to build the initial snapshot.
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 1583862..efdbfd1 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -792,7 +792,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
 	if (cmd->kind == REPLICATION_KIND_PHYSICAL)
 	{
-		ReplicationSlotCreate(cmd->slotname, false, RS_PERSISTENT, false);
+		ReplicationSlotCreate(cmd->slotname, false, RS_PERSISTENT, cmd->failover);
 	}
 	else
 	{
@@ -803,7 +803,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 		 * handle errors during initialization because it'll get dropped if
 		 * this transaction fails. We'll make it persistent at the end.
 		 */
-		ReplicationSlotCreate(cmd->slotname, true, RS_EPHEMERAL, false);
+		ReplicationSlotCreate(cmd->slotname, true, RS_EPHEMERAL, cmd->failover);
 	}
 
 	initStringInfo(&output_message);
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index aec6c4c..e7247af 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5077,13 +5077,13 @@ DATA(insert OID = 3473 (  spg_range_quad_leaf_consistent	PGNSP PGUID 12 1 0 0 0
 DESCR("SP-GiST support for quad tree over range");
 
 /* replication slots */
-DATA(insert OID = 3779 (  pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 2 0 2249 "19 16" "{19,16,19,3220}" "{i,i,o,o}" "{slot_name,immediately_reserve,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ ));
+DATA(insert OID = 3779 (  pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 2249 "19 16 16" "{19,16,16,19,3220}" "{i,i,i,o,o}" "{slot_name,immediately_reserve,failover,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ ));
 DESCR("create a physical replication slot");
 DATA(insert OID = 3780 (  pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
 DESCR("drop a replication slot");
 DATA(insert OID = 3781 (  pg_get_replication_slots	PGNSP PGUID 12 1 10 0 0 f f f f f t s s 0 0 2249 "" "{19,19,25,26,16,23,28,28,3220,3220}" "{o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ ));
 DESCR("information about replication slots currently in use");
-DATA(insert OID = 3786 (  pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 2 0 2249 "19 19" "{19,19,25,3220}" "{i,i,o,o}" "{slot_name,plugin,slot_name,xlog_position}" _null_ _null_ pg_create_logical_replication_slot _null_ _null_ _null_ ));
+DATA(insert OID = 3786 (  pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 2249 "19 19 16" "{19,19,16,25,3220}" "{i,i,i,o,o}" "{slot_name,plugin,failover,slot_name,xlog_position}" _null_ _null_ pg_create_logical_replication_slot _null_ _null_ _null_ ));
 DESCR("set up a logical replication slot");
 DATA(insert OID = 3782 (  pg_logical_slot_get_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v u 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,25}" "{i,i,i,v,o,o,o}" "{slot_name,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ _null_ pg_logical_slot_get_changes _null_ _null_ _null_ ));
 DESCR("get changes from replication slot");
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index d2f1edb..a8fa9d5 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -56,6 +56,7 @@ typedef struct CreateReplicationSlotCmd
 	ReplicationKind kind;
 	char	   *plugin;
 	bool		reserve_wal;
+	bool		failover;
 } CreateReplicationSlotCmd;
 
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index cdcbd37..9e23a29 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -4,6 +4,7 @@
  *
  * Copyright (c) 2012-2016, PostgreSQL Global Development Group
  *
+ * src/include/replication/slot.h
  *-------------------------------------------------------------------------
  */
 #ifndef SLOT_H
-- 
2.1.0

From 47f8bd5ecfe896824c9e51f100c47795a55ce601 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Tue, 23 Feb 2016 15:31:13 +0800
Subject: [PATCH 5/7] Document failover slots

---
 doc/src/sgml/func.sgml              | 15 +++++++++-----
 doc/src/sgml/high-availability.sgml | 41 +++++++++++++++++++++++++++++++++++++
 doc/src/sgml/logicaldecoding.sgml   |  2 +-
 doc/src/sgml/protocol.sgml          | 19 ++++++++++++++++-
 4 files changed, 70 insertions(+), 7 deletions(-)

diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index c0b94bc..649a0c2 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -17449,7 +17449,7 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
         <indexterm>
          <primary>pg_create_physical_replication_slot</primary>
         </indexterm>
-        <literal><function>pg_create_physical_replication_slot(<parameter>slot_name</parameter> <type>name</type> <optional>, <parameter>immediately_reserve</> <type>boolean</> </optional>)</function></literal>
+        <literal><function>pg_create_physical_replication_slot(<parameter>slot_name</parameter> <type>name</type>, <optional><parameter>immediately_reserve</> <type>boolean</></optional>, <optional><parameter>failover</> <type>boolean</></optional>)</function></literal>
        </entry>
        <entry>
         (<parameter>slot_name</parameter> <type>name</type>, <parameter>xlog_position</parameter> <type>pg_lsn</type>)
@@ -17460,7 +17460,10 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
         when <literal>true</>, specifies that the <acronym>LSN</> for this
         replication slot be reserved immediately; otherwise
         the <acronym>LSN</> is reserved on first connection from a streaming
-        replication client. Streaming changes from a physical slot is only
+        replication client. If <literal>failover</literal> is <literal>true</literal>
+        then the slot is created as a failover slot; see <xref
+        linkend="streaming-replication-slots-failover">.
+        Streaming changes from a physical slot is only
         possible with the streaming-replication protocol &mdash;
         see <xref linkend="protocol-replication">. This function corresponds
         to the replication protocol command <literal>CREATE_REPLICATION_SLOT
@@ -17489,7 +17492,7 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
         <indexterm>
          <primary>pg_create_logical_replication_slot</primary>
         </indexterm>
-        <literal><function>pg_create_logical_replication_slot(<parameter>slot_name</parameter> <type>name</type>, <parameter>plugin</parameter> <type>name</type>)</function></literal>
+        <literal><function>pg_create_logical_replication_slot(<parameter>slot_name</parameter> <type>name</type>, <parameter>plugin</parameter> <type>name</type>, <optional><parameter>failover</> <type>boolean</></optional>)</function></literal>
        </entry>
        <entry>
         (<parameter>slot_name</parameter> <type>name</type>, <parameter>xlog_position</parameter> <type>pg_lsn</type>)
@@ -17497,8 +17500,10 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
        <entry>
         Creates a new logical (decoding) replication slot named
         <parameter>slot_name</parameter> using the output plugin
-        <parameter>plugin</parameter>.  A call to this function has the same
-        effect as the replication protocol command
+        <parameter>plugin</parameter>. If <literal>failover</literal>
+        is <literal>true</literal> the slot is created as a failover
+        slot; see <xref linkend="streaming-replication-slots-failover">. A call to
+        this function has the same effect as the replication protocol command
         <literal>CREATE_REPLICATION_SLOT ... LOGICAL</literal>.
        </entry>
       </row>
diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml
index 6cb690c..4b75175 100644
--- a/doc/src/sgml/high-availability.sgml
+++ b/doc/src/sgml/high-availability.sgml
@@ -949,6 +949,47 @@ primary_slot_name = 'node_a_slot'
 </programlisting>
     </para>
    </sect3>
+
+   <sect3 id="streaming-replication-slots-failover" xreflabel="Failover slots">
+     <title>Failover slots</title>
+
+     <para>
+      Normally a replication slot is not preserved across backup and restore
+      (such as by <application>pg_basebackup</application>) and is not
+      replicated to standbys. Slots are <emphasis>automatically
+      dropped</emphasis> when starting up as a streaming replica or in archive
+      recovery (PITR) mode.
+     </para>
+
+     <para>
+      To make it possible to for an application to consistently follow
+      failover when a replica is promoted to a new master a slot may be
+      created as a <emphasis>failover slot</emphasis>. A failover slot may
+      only be created, replayed from or dropped on a master server. Changes to
+      the slot are written to WAL and replicated to standbys. When a standby
+      is promoted applications may connect to the slot on the standby and
+      resume replay from it at a consistent point, as if it was the original
+      master. Failover slots may not be used to replay from a standby before
+      promotion.
+     </para>
+
+     <para>
+      Non-failover slots may be created on and used from a replica. This is
+      currently limited to physical slots as logical decoding is not supported
+      on replica server.
+     </para>
+
+     <para>
+      When a failover slot created on the master has the same name as a
+      non-failover slot on a replica server the non-failover slot will be
+      automatically dropped. Any client currently connected will be
+      disconnected with an error indicating a conflict with recovery. It
+      is strongly recommended that you avoid creating failover slots with
+      the same name as slots on replicas.
+     </para>
+
+   </sect3>
+
   </sect2>
 
   <sect2 id="cascading-replication">
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index e841348..c7b43ed 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -280,7 +280,7 @@ $ pg_recvlogical -d postgres --slot test --drop-slot
     The commands
     <itemizedlist>
      <listitem>
-      <para><literal>CREATE_REPLICATION_SLOT <replaceable>slot_name</replaceable> LOGICAL <replaceable>output_plugin</replaceable></literal></para>
+      <para><literal>CREATE_REPLICATION_SLOT <replaceable>slot_name</replaceable> LOGICAL <replaceable>output_plugin</replaceable> <optional>FAILOVER</optional></literal></para>
      </listitem>
 
      <listitem>
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 522128e..33b6830 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1434,7 +1434,7 @@ The commands accepted in walsender mode are:
   </varlistentry>
 
   <varlistentry>
-   <term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</> { <literal>PHYSICAL</> [ <literal>RESERVE_WAL</> ] | <literal>LOGICAL</> <replaceable class="parameter">output_plugin</> }
+   <term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</> { <literal>PHYSICAL</> <optional><literal>RESERVE_WAL</></> | <literal>LOGICAL</> <replaceable class="parameter">output_plugin</> } <optional><literal>FAILOVER</></>
      <indexterm><primary>CREATE_REPLICATION_SLOT</primary></indexterm>
     </term>
     <listitem>
@@ -1474,6 +1474,17 @@ The commands accepted in walsender mode are:
         </para>
        </listitem>
       </varlistentry>
+
+      <varlistentry>
+       <term><literal>FAILOVER</></term>
+       <listitem>
+        <para>
+         Create this slot as a <link linkend="streaming-replication-slots-failover">
+         failover slot</link>.
+        </para>
+       </listitem>
+      </varlistentry>
+
      </variablelist>
     </listitem>
   </varlistentry>
@@ -1829,6 +1840,12 @@ The commands accepted in walsender mode are:
       to process the output for streaming.
      </para>
 
+     <para>
+      Logical replication automatically follows timeline switches. It is
+      not necessary or possible to supply a <literal>TIMELINE</literal>
+      option like in physical replication.
+     </para>
+
      <variablelist>
       <varlistentry>
        <term><literal>SLOT</literal> <replaceable class="parameter">slot_name</></term>
-- 
2.1.0

From 0a64990cf0b89ab29f64c46c2636e32dc37258fd Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Tue, 23 Feb 2016 15:55:01 +0800
Subject: [PATCH 6/7] Add 'failover' to pg_replication_slots

---
 contrib/test_decoding/expected/ddl.out | 38 ++++++++++++++++++++++++++++------
 contrib/test_decoding/sql/ddl.sql      | 15 ++++++++++++--
 doc/src/sgml/catalogs.sgml             | 10 +++++++++
 src/backend/catalog/system_views.sql   |  1 +
 src/backend/replication/slotfuncs.c    |  6 +++++-
 src/include/catalog/pg_proc.h          |  2 +-
 src/test/regress/expected/rules.out    |  3 ++-
 7 files changed, 64 insertions(+), 11 deletions(-)

diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out
index 5fed500..5b2f34a 100644
--- a/contrib/test_decoding/expected/ddl.out
+++ b/contrib/test_decoding/expected/ddl.out
@@ -61,11 +61,37 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
 SELECT slot_name, plugin, slot_type, active,
     NOT catalog_xmin IS NULL AS catalog_xmin_set,
     xmin IS NULl  AS data_xmin_not_set,
-    pg_xlog_location_diff(restart_lsn, '0/01000000') > 0 AS some_wal
+    pg_xlog_location_diff(restart_lsn, '0/01000000') > 0 AS some_wal,
+    failover
 FROM pg_replication_slots;
-    slot_name    |    plugin     | slot_type | active | catalog_xmin_set | data_xmin_not_set | some_wal 
------------------+---------------+-----------+--------+------------------+-------------------+----------
- regression_slot | test_decoding | logical   | f      | t                | t                 | t
+    slot_name    |    plugin     | slot_type | active | catalog_xmin_set | data_xmin_not_set | some_wal | failover 
+-----------------+---------------+-----------+--------+------------------+-------------------+----------+----------
+ regression_slot | test_decoding | logical   | f      | t                | t                 | t        | f
+(1 row)
+
+/* same for a failover slot */
+SELECT 'init' FROM pg_create_logical_replication_slot('failover_slot', 'test_decoding', true);
+ ?column? 
+----------
+ init
+(1 row)
+
+SELECT slot_name, plugin, slot_type, active,
+    NOT catalog_xmin IS NULL AS catalog_xmin_set,
+    xmin IS NULl  AS data_xmin_not_set,
+    pg_xlog_location_diff(restart_lsn, '0/01000000') > 0 AS some_wal,
+    failover
+FROM pg_replication_slots
+WHERE slot_name = 'failover_slot';
+   slot_name   |    plugin     | slot_type | active | catalog_xmin_set | data_xmin_not_set | some_wal | failover 
+---------------+---------------+-----------+--------+------------------+-------------------+----------+----------
+ failover_slot | test_decoding | logical   | f      | t                | t                 | t        | t
+(1 row)
+
+SELECT pg_drop_replication_slot('failover_slot');
+ pg_drop_replication_slot 
+--------------------------
+ 
 (1 row)
 
 /*
@@ -676,7 +702,7 @@ SELECT pg_drop_replication_slot('regression_slot');
 
 /* check that the slot is gone */
 SELECT * FROM pg_replication_slots;
- slot_name | plugin | slot_type | datoid | database | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn 
------------+--------+-----------+--------+----------+--------+------------+------+--------------+-------------+---------------------
+ slot_name | plugin | slot_type | datoid | database | active | active_pid | failover | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn 
+-----------+--------+-----------+--------+----------+--------+------------+----------+------+--------------+-------------+---------------------
 (0 rows)
 
diff --git a/contrib/test_decoding/sql/ddl.sql b/contrib/test_decoding/sql/ddl.sql
index dc61ef4..f64b21c 100644
--- a/contrib/test_decoding/sql/ddl.sql
+++ b/contrib/test_decoding/sql/ddl.sql
@@ -24,16 +24,27 @@ SELECT 'init' FROM pg_create_physical_replication_slot('repl');
 SELECT data FROM pg_logical_slot_get_changes('repl', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 SELECT pg_drop_replication_slot('repl');
 
-
 SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
 
 /* check whether status function reports us, only reproduceable columns */
 SELECT slot_name, plugin, slot_type, active,
     NOT catalog_xmin IS NULL AS catalog_xmin_set,
     xmin IS NULl  AS data_xmin_not_set,
-    pg_xlog_location_diff(restart_lsn, '0/01000000') > 0 AS some_wal
+    pg_xlog_location_diff(restart_lsn, '0/01000000') > 0 AS some_wal,
+    failover
 FROM pg_replication_slots;
 
+/* same for a failover slot */
+SELECT 'init' FROM pg_create_logical_replication_slot('failover_slot', 'test_decoding', true);
+SELECT slot_name, plugin, slot_type, active,
+    NOT catalog_xmin IS NULL AS catalog_xmin_set,
+    xmin IS NULl  AS data_xmin_not_set,
+    pg_xlog_location_diff(restart_lsn, '0/01000000') > 0 AS some_wal,
+    failover
+FROM pg_replication_slots
+WHERE slot_name = 'failover_slot';
+SELECT pg_drop_replication_slot('failover_slot');
+
 /*
  * Check that changes are handled correctly when interleaved with ddl
  */
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 951f59b..0a3af1f 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -5377,6 +5377,16 @@
      </row>
 
      <row>
+      <entry><structfield>failover</structfield></entry>
+      <entry><type>boolean</type></entry>
+      <entry></entry>
+      <entry>
+       True if this slot is a failover slot; see
+       <xref linkend="streaming-replication-slots-failover"/>.
+      </entry>
+     </row>
+
+     <row>
       <entry><structfield>xmin</structfield></entry>
       <entry><type>xid</type></entry>
       <entry></entry>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index fcb877d..26c02e4 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -704,6 +704,7 @@ CREATE VIEW pg_replication_slots AS
             D.datname AS database,
             L.active,
             L.active_pid,
+            L.failover,
             L.xmin,
             L.catalog_xmin,
             L.restart_lsn,
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index a2dfc40..abc450d 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -177,7 +177,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 10
+#define PG_GET_REPLICATION_SLOTS_COLS 11
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
 	Tuplestorestate *tupstore;
@@ -227,6 +227,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		XLogRecPtr	restart_lsn;
 		XLogRecPtr	confirmed_flush_lsn;
 		pid_t		active_pid;
+		bool		failover;
 		Oid			database;
 		NameData	slot_name;
 		NameData	plugin;
@@ -249,6 +250,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 			namecpy(&plugin, &slot->data.plugin);
 
 			active_pid = slot->active_pid;
+			failover = slot->data.failover;
 		}
 		SpinLockRelease(&slot->mutex);
 
@@ -279,6 +281,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		else
 			nulls[i++] = true;
 
+		values[i++] = BoolGetDatum(failover);
+
 		if (xmin != InvalidTransactionId)
 			values[i++] = TransactionIdGetDatum(xmin);
 		else
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index e7247af..836db85 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5081,7 +5081,7 @@ DATA(insert OID = 3779 (  pg_create_physical_replication_slot PGNSP PGUID 12 1 0
 DESCR("create a physical replication slot");
 DATA(insert OID = 3780 (  pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
 DESCR("drop a replication slot");
-DATA(insert OID = 3781 (  pg_get_replication_slots	PGNSP PGUID 12 1 10 0 0 f f f f f t s s 0 0 2249 "" "{19,19,25,26,16,23,28,28,3220,3220}" "{o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ ));
+DATA(insert OID = 3781 (  pg_get_replication_slots	PGNSP PGUID 12 1 10 0 0 f f f f f t s s 0 0 2249 "" "{19,19,25,26,16,23,16,28,28,3220,3220}" "{o,o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,active_pid,failover,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ ));
 DESCR("information about replication slots currently in use");
 DATA(insert OID = 3786 (  pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 2249 "19 19 16" "{19,19,16,25,3220}" "{i,i,i,o,o}" "{slot_name,plugin,failover,slot_name,xlog_position}" _null_ _null_ pg_create_logical_replication_slot _null_ _null_ _null_ ));
 DESCR("set up a logical replication slot");
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 81bc5c9..d8315c6 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1417,11 +1417,12 @@ pg_replication_slots| SELECT l.slot_name,
     d.datname AS database,
     l.active,
     l.active_pid,
+    l.failover,
     l.xmin,
     l.catalog_xmin,
     l.restart_lsn,
     l.confirmed_flush_lsn
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn)
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, active, active_pid, failover, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
-- 
2.1.0

From 2b5af6a1e1a73b614057b8a6b9e1e1d822b7baa8 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Thu, 10 Mar 2016 10:50:59 +0800
Subject: [PATCH 7/7] Introduce TAP recovery tests for failover slots

---
 src/test/recovery/t/007_failover_slots.pl | 367 ++++++++++++++++++++++++++++++
 1 file changed, 367 insertions(+)
 create mode 100644 src/test/recovery/t/007_failover_slots.pl

diff --git a/src/test/recovery/t/007_failover_slots.pl b/src/test/recovery/t/007_failover_slots.pl
new file mode 100644
index 0000000..8524e20
--- /dev/null
+++ b/src/test/recovery/t/007_failover_slots.pl
@@ -0,0 +1,367 @@
+#
+# Test failover slots
+#
+use strict;
+use warnings;
+use bigint;
+use PostgresNode;
+use TestLib;
+use Test::More;
+use RecursiveCopy;
+use File::Copy;
+use File::Basename qw(basename);
+use List::Util qw();
+use Data::Dumper;
+
+use Carp 'verbose';
+$SIG{ __DIE__ } = sub { Carp::confess( @_ ) };
+
+sub lsn_to_bigint
+{
+	my ($lsn) = @_;
+	my ($high, $low) = split("/",$lsn);
+	return hex($high) * 2**32 + hex($low);
+}
+
+sub get_slot_info
+{
+	my ($node, $slot_name) = @_;
+
+	my $esc_slot_name = $slot_name;
+	$esc_slot_name =~ s/'/''/g;
+	my @selectlist = ('slot_name', 'plugin', 'slot_type', 'database', 'active_pid', 'xmin', 'catalog_xmin', 'restart_lsn', 'confirmed_flush_lsn');
+	my $row = $node->safe_psql('postgres', "SELECT " . join(', ', @selectlist) . " FROM pg_replication_slots WHERE slot_name = '$esc_slot_name';",
+		extra_params => ['-z']);
+	chomp $row;
+	my @fields = split("\0", $row);
+	if (scalar @fields != scalar @selectlist)
+	{
+		die "Select-list '@selectlist' didn't match length of result-list '@fields'";
+	}
+	my %slotinfo;
+	for (my $i = 0; $i < scalar @selectlist; $i++)
+	{
+		$slotinfo{$selectlist[$i]} = $fields[$i];
+	}
+	return \%slotinfo;
+}
+
+sub diag_slotinfo
+{
+	my ($info, $msg) = @_;
+	diag "slot " . $info->{slot_name} . ": " . Dumper($info);
+}
+
+sub wait_for_catchup
+{
+	my ($node_master, $node_replica) = @_;
+
+	my $master_lsn = $node_master->safe_psql('postgres', 'SELECT pg_current_xlog_insert_location()');
+	diag "waiting for " . $node_replica->name . " to catch up to $master_lsn on " . $node_master->name;
+	my $ret = $node_replica->poll_query_until('postgres',
+		"SELECT pg_last_xlog_replay_location() >= '$master_lsn'::pg_lsn;");
+	BAIL_OUT('replica failed to catch up') unless $ret;
+	my $replica_lsn = $node_replica->safe_psql('postgres', 'SELECT pg_last_xlog_replay_location()');
+	diag "Replica is caught up to $replica_lsn, past required LSN $master_lsn";
+}
+
+sub read_slot_updates_from_xlog
+{
+	my ($node, $timeline) = @_;
+	my ($stdout, $stderr) = ('', '');
+	# Look at master xlogs and examine sequence advances
+	my $wal_pattern = sprintf("%s/pg_xlog/%08X" . ("?" x 16), $node->data_dir, $timeline);
+	my @wal = glob $wal_pattern;
+	my $firstwal = List::Util::minstr(@wal);
+	my $lastwal = basename(List::Util::maxstr(@wal));
+	diag "decoding xlog on " . $node->name . " from $firstwal to $lastwal";
+	IPC::Run::run ['pg_xlogdump', $firstwal, $lastwal], '>', \$stdout, '2>', \$stderr;
+	like($stderr, qr/invalid record length at [0-9A-F]+\/[0-9A-F]+: wanted 24, got 0/,
+		'pg_xlogdump exits with expected error');
+	my @slots = grep(/ReplicationSlot/, split(/\n/, $stdout));
+
+	# Parse the dumped xlog data
+	my @slot_updates = ();
+	for my $slot (@slots) {
+		if (my @matches = ($slot =~ /lsn: ([[:xdigit:]]{1,8}\/[[:xdigit:]]{1,8}), prev [[:xdigit:]]{1,8}\/[[:xdigit:]]{1,8}, desc: UPDATE of slot (\w+) with restart ([[:xdigit:]]{1,8}\/[[:xdigit:]]{1,8}) and xid ([[:digit:]]+) confirmed to ([[:xdigit:]]{1,8}\/[[:xdigit:]]{1,8})/))
+		{
+			my %slot_update = (
+				action => 'UPDATE',
+				log_lsn => $1, slot_name => $2, restart_lsn => $3,
+				xid => $4, confirm_lsn => $5
+				);
+			diag "Replication slot create/advance: $slot_update{slot_name} advanced to $slot_update{confirm_lsn} with restart $slot_update{restart_lsn} and $slot_update{xid} in xlog entry $slot_update{log_lsn}";
+			push @slot_updates, \%slot_update;
+		}
+		elsif ($slot =~ /DELETE/)
+		{
+			diag "Replication slot delete: $slot";
+		}
+		else
+		{
+			die "Slot xlog entry didn't match pattern: $slot";
+		}
+	}
+	return \@slot_updates;
+}
+
+sub check_slot_wal_update
+{
+	my ($entry, $slotname, %params) = @_;
+
+	ok(defined($entry), 'xlog entry exists for slot $slotname');
+	SKIP: {
+		skip 'Expected xlog entry was undef' unless defined($entry);
+		my %entry = %{$entry}; undef $entry;
+		diag "Examining decoded slot update xlog entry: " . Dumper(\%entry);
+		is($entry{action}, 'UPDATE', "action is an update");
+		is($entry{slot_name}, $slotname, "action affects slot " . $slotname);
+
+		cmp_ok(lsn_to_bigint($entry{restart_lsn}), "le",
+		       lsn_to_bigint($entry{log_lsn}),
+		       "restart_lsn is no greater than LSN when logged");
+
+		cmp_ok(lsn_to_bigint($entry{confirm_lsn}), "le",
+		       lsn_to_bigint($entry{log_lsn}),
+		       "confirm_lsn is no greater than LSN when logged");
+
+		cmp_ok(lsn_to_bigint($entry{confirm_lsn}), "ge",
+			lsn_to_bigint($entry{restart_lsn}),
+			'confirm_lsn equal to or ahead of restart_lsn');
+
+		cmp_ok(lsn_to_bigint($entry{restart_lsn}), "le",
+			lsn_to_bigint($params{expect_max_restart_lsn}),
+			'restart_lsn is at or before expected')
+			if ($params{expect_max_restart_lsn});
+
+		cmp_ok(lsn_to_bigint($entry{restart_lsn}), "ge",
+			lsn_to_bigint($params{expect_min_restart_lsn}),
+			'restart_lsn is at or after expected')
+			if ($params{expect_min_restart_lsn});
+
+		cmp_ok(lsn_to_bigint($entry{confirm_lsn}), "le",
+			lsn_to_bigint($params{expect_max_confirm_lsn}),
+			'confirm_lsn is at or before expected')
+			if ($params{expect_max_confirm_lsn});
+
+		cmp_ok(lsn_to_bigint($entry{confirm_lsn}), "ge",
+			lsn_to_bigint($params{expect_min_confirm_lsn}),
+			'confirm_lsn is at or after expected')
+			if ($params{expect_min_confirm_lsn});
+	}
+}
+
+sub test_read_from_slot
+{
+	my ($node, $slot, $expected) = @_;
+	my $slot_quoted = $slot;
+	$slot_quoted =~ s/'/''/g;
+	my ($ret, $stdout, $stderr) = $node->psql('postgres',
+		"SELECT data FROM pg_logical_slot_peek_changes('$slot_quoted', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');"
+	);
+	is($ret, 0, "replaying from slot $slot is successful");
+	is($stderr, '', "replay from slot $slot produces no stderr");
+	if (defined($expected)) {
+		is($stdout, $expected, "slot $slot returned expected output");
+	}
+	return $stderr;
+}
+
+sub wait_for_end_of_recovery
+{
+	my ($node) = @_;
+	$node->poll_query_until('postgres',
+		"SELECT NOT pg_is_in_recovery();");
+}
+
+diag "";
+
+
+
+my ($stdout, $stderr, $ret, $slotinfo);
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1, has_archiving => 1);
+$node_master->append_conf('postgresql.conf', "wal_level = 'logical'\n");
+$node_master->append_conf('postgresql.conf', "max_replication_slots = 2\n");
+$node_master->append_conf('postgresql.conf', "max_wal_senders = 2\n");
+$node_master->append_conf('postgresql.conf', "log_min_messages = 'debug3'\n");
+$node_master->dump_info;
+$node_master->start;
+
+my $master_beforecreate_bb_lsn = $node_master->safe_psql('postgres',
+	"SELECT pg_current_xlog_insert_location()");
+
+diag "master LSN is $master_beforecreate_bb_lsn before creation of bb_failover";
+
+$node_master->safe_psql('postgres',
+"SELECT pg_create_logical_replication_slot('bb_failover', 'test_decoding', true);"
+);
+my $bb_beforeconsume_si = get_slot_info($node_master, 'bb_failover');
+diag_slotinfo $bb_beforeconsume_si, 'bb_beforeconsume';
+
+$node_master->safe_psql('postgres', "CREATE TABLE decoding(blah text);");
+$node_master->safe_psql('postgres',
+	"INSERT INTO decoding(blah) VALUES ('consumed');");
+($ret, $stdout, $stderr) = $node_master->psql('postgres',
+	"SELECT data FROM pg_logical_slot_get_changes('bb_failover', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');");
+is($ret, 0, 'replaying from bb_failover on master is successful');
+is( $stdout, q(BEGIN
+table public.decoding: INSERT: blah[text]:'consumed'
+COMMIT), 'decoded expected data from slot bb_failover on master');
+is($stderr, '', 'replay from slot bb_failover produces no stderr');
+
+my $bb_afterconsume_si = get_slot_info($node_master, 'bb_failover');
+diag_slotinfo $bb_afterconsume_si, 'bb_afterconsume';
+
+($ret, $stdout, $stderr) = $node_master->psql('postgres',
+	"SELECT data FROM pg_logical_slot_get_changes('bb_failover', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');");
+is ($ret, 0, 'no error reading empty slot changes after get');
+is ($stdout, '', 'no new changes to read from slot after get');
+
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+
+$node_master->safe_psql('postgres',
+	"INSERT INTO decoding(blah) VALUES ('beforebb');");
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+
+my $backup_name = 'b1';
+$node_master->backup_fs_hot($backup_name);
+
+my $node_replica = get_new_node('replica');
+$node_replica->init_from_backup(
+	$node_master, $backup_name,
+	has_streaming => 1,
+	has_restoring => 1);
+$node_replica->start;
+
+my $master_beforecreate_ab_lsn = $node_master->safe_psql('postgres',
+	"SELECT pg_current_xlog_insert_location()");
+
+diag "master LSN is $master_beforecreate_ab_lsn before creation of ab_failover";
+
+$node_master->safe_psql('postgres',
+"SELECT pg_create_logical_replication_slot('ab_failover', 'test_decoding', true);"
+);
+
+my $ab_beforeconsume_si = get_slot_info($node_master, 'ab_failover');
+diag_slotinfo $ab_beforeconsume_si, 'ab_beforeconsume';
+
+$node_master->safe_psql('postgres',
+	"INSERT INTO decoding(blah) VALUES ('afterbb');");
+
+wait_for_catchup($node_master, $node_replica);
+
+$stdout = $node_master->safe_psql('postgres', 'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name');
+is($stdout, "ab_failover\nbb_failover", 'Both failover slots exist on master');
+
+
+# Verify that only the before base_backup slot is on the replica
+$stdout = $node_replica->safe_psql('postgres', 'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name');
+is($stdout, "ab_failover\nbb_failover", 'Both failover slots exist on replica')
+  or BAIL_OUT('Remaining tests meaningless');
+
+# Boom, crash
+$node_master->stop('fast');
+
+my @slot_updates = @{ read_slot_updates_from_xlog($node_master, 1) };
+
+#
+# Decode the WAL from the master and make sure the expected entries and only the
+# expected entries are present.
+#
+# We want to see two WAL entries, one for each slot. There won't be another entry
+# for the slot advance because right now we don't write out WAL when a slot's confirmed
+# location advances, only when the flush location or xmin advance. The restart lsn
+# and confirmed flush LSN in the slot's WAL record must not be less than the LSN
+# of the master before we created the slot and not greater than the position we saw
+# in pg_replication_slots after slot creation.
+#
+
+check_slot_wal_update($slot_updates[0], 'bb_failover',
+	expect_min_restart_lsn => $master_beforecreate_bb_lsn,
+	expect_min_confirm_lsn => $master_beforecreate_bb_lsn,
+	expect_max_restart_lsn => $bb_beforeconsume_si->{restart_lsn},
+	expect_max_confirm_lsn => $bb_beforeconsume_si->{confirmed_flush_lsn});
+
+check_slot_wal_update($slot_updates[1], 'ab_failover',
+	expect_min_restart_lsn => $master_beforecreate_ab_lsn,
+	expect_min_confirm_lsn => $master_beforecreate_ab_lsn,
+	expect_max_restart_lsn => $ab_beforeconsume_si->{restart_lsn},
+	expect_max_confirm_lsn => $ab_beforeconsume_si->{confirmed_flush_lsn});
+
+# Consuming from a slot does not cause the slot to be written out even on
+# CHECKPOINT. Since nothing else would have dirtied the slot, there should
+# be no more WAL entries for failover slots.
+#
+# The client is expected to keep track of the confirmed LSN and skip replaying
+# data it's already seen.
+ok(!defined($slot_updates[3]), 'Third xlog entry does not exist');
+
+$node_replica->promote;
+
+wait_for_end_of_recovery($node_replica);
+
+$node_replica->safe_psql('postgres',
+	"INSERT INTO decoding(blah) VALUES ('after failover');");
+
+my $bb_afterpromote_si = get_slot_info($node_replica, 'bb_failover');
+diag_slotinfo $bb_afterpromote_si, 'bb_afterpromote';
+# Because the confirmed LSN didn't get logged, the replica should have the slot
+# at the position it was created at, not the position after we consumed data.
+is($bb_afterpromote_si->{confirmed_flush_lsn}, $bb_beforeconsume_si->{confirmed_flush_lsn},
+	'slot bb_failover confirmed pos on replica has gone backwards');
+# the restart position won't have advanced either since we didn't log any new
+# entries for it and we haven't done enough work to trigger a flush.
+is($bb_afterpromote_si->{restart_lsn}, $bb_beforeconsume_si->{restart_lsn},
+	'slot bb_failover restart position is unchanged');
+
+# Same for the after-basebackup slot.
+my $ab_afterpromote_si = get_slot_info($node_replica, 'ab_failover');
+diag_slotinfo $ab_afterpromote_si, 'ab_afterpromote';
+is($ab_afterpromote_si->{confirmed_flush_lsn}, $ab_beforeconsume_si->{confirmed_flush_lsn},
+	'slot ab_failover confirmed pos on replica has gone backwards');
+is($ab_afterpromote_si->{restart_lsn}, $ab_beforeconsume_si->{restart_lsn},
+	'slot ab_failover restart position is unchanged');
+
+
+
+
+# Can replay from slot ab, following the timeline switch
+test_read_from_slot($node_replica, 'ab_failover', q(BEGIN
+table public.decoding: INSERT: blah[text]:'afterbb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'after failover'
+COMMIT));
+
+# Can replay from slot bb too
+#
+# Note that we expect to see data that we already replayed on the master here
+# because the confirm lsn won't be flushed on the master and will go backwards.
+#
+# See http://www.postgresql.org/message-id/camsr+ygsatrgqpcx9qx4eocizwsa27xjkeipsotaje8ofix...@mail.gmail.com
+#
+# (If Pg is patched to flush all slots on shutdown then this will change, but
+#  it'll still be able to go backwards on an unclean shutdown).
+#
+test_read_from_slot($node_replica, 'bb_failover', q(BEGIN
+table public.decoding: INSERT: blah[text]:'consumed'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'beforebb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'afterbb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'after failover'
+COMMIT));
+
+$node_replica->stop('fast');
+
+# We don't need the standby anymore
+$node_replica->teardown_node();
+
+done_testing();
-- 
2.1.0

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to