Hi all

Here's v2 of the failover slots patch. It replicates a logical slot to a
physical streaming replica downstream, keeping the slots in sync. After the
downstream is promoted a client can replay from the logical slot.

UI to allow creation of non-failover slots is pending.

There's more testing to do to cover all the corners: drop slots, drop and
re-create, name conflicts between downstream !failover slots and upstream
failover slots, etc.

There's also a known bug where WAL isn't correctly retained for a slot
where that slot was created before a basebackup which I'll fix in a
revision shortly.

I'm interested in ideas on how to better test this.

-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
<http://www.2ndquadrant.com/>
 PostgreSQL Development, 24x7 Support, Training & Services
From c2535eb27c6efc5dddd16a6aa7142fd0f59e85d3 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Wed, 20 Jan 2016 17:16:29 +0800
Subject: [PATCH 1/2] Implement failover slots

Originally replication slots were unique to a single node and weren't
recorded in WAL or replicated. A logical decoding client couldn't follow
a physical standby failover and promotion because the promoted replica
didn't have the original master's slots. The replica may not have
retained all required WAL and there was no way to create a new logical
slot and rewind it back to the point the logical client had replayed to
anyway.

Failover slots lift this limitation by replicating slots consistently to
physical standbys, keeping them up to date and using them in WAL
retention calculations. This allows a logical decoding client to follow
a physical failover and promotion without losing its place in the change
stream.

Simon Riggs and Craig Ringer

WIP. Open items:

* Testing
* Implement !failover slots and UI for marking slots as failover slots
* Fix WAL retention for slots created before a basebackup
---
 src/backend/access/rmgrdesc/Makefile       |   2 +-
 src/backend/access/rmgrdesc/replslotdesc.c |  63 +++++
 src/backend/access/transam/rmgr.c          |   1 +
 src/backend/commands/dbcommands.c          |   3 +
 src/backend/replication/basebackup.c       |  12 -
 src/backend/replication/logical/decode.c   |   1 +
 src/backend/replication/logical/logical.c  |  19 +-
 src/backend/replication/slot.c             | 433 ++++++++++++++++++++++++++++-
 src/backend/replication/slotfuncs.c        |   1 +
 src/bin/pg_xlogdump/replslotdesc.c         |   1 +
 src/bin/pg_xlogdump/rmgrdesc.c             |   1 +
 src/include/access/rmgrlist.h              |   1 +
 src/include/replication/slot.h             |  61 +---
 src/include/replication/slot_xlog.h        | 103 +++++++
 14 files changed, 610 insertions(+), 92 deletions(-)
 create mode 100644 src/backend/access/rmgrdesc/replslotdesc.c
 create mode 120000 src/bin/pg_xlogdump/replslotdesc.c
 create mode 100644 src/include/replication/slot_xlog.h

diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile
index c72a1f2..600b544 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -10,7 +10,7 @@ include $(top_builddir)/src/Makefile.global
 
 OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o gindesc.o gistdesc.o \
 	   hashdesc.o heapdesc.o mxactdesc.o nbtdesc.o relmapdesc.o \
-	   replorigindesc.o seqdesc.o smgrdesc.o spgdesc.o \
+	   replorigindesc.o replslotdesc.o seqdesc.o smgrdesc.o spgdesc.o \
 	   standbydesc.o tblspcdesc.o xactdesc.o xlogdesc.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/rmgrdesc/replslotdesc.c b/src/backend/access/rmgrdesc/replslotdesc.c
new file mode 100644
index 0000000..b882846
--- /dev/null
+++ b/src/backend/access/rmgrdesc/replslotdesc.c
@@ -0,0 +1,63 @@
+/*-------------------------------------------------------------------------
+ *
+ * replslotdesc.c
+ *	  rmgr descriptor routines for replication/slot.c
+ *
+ * Portions Copyright (c) 2015, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/rmgrdesc/replslotdesc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "replication/slot_xlog.h"
+
+void
+replslot_desc(StringInfo buf, XLogReaderState *record)
+{
+	char	   *rec = XLogRecGetData(record);
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	switch (info)
+	{
+		case XLOG_REPLSLOT_UPDATE:
+			{
+				ReplicationSlotInWAL xlrec;
+
+				xlrec = (ReplicationSlotInWAL) rec;
+
+				appendStringInfo(buf, "slot %s to xmin=%u, catmin=%u, restart_lsn="UINT64_FORMAT"@%u",
+						NameStr(xlrec->name), xlrec->xmin, xlrec->catalog_xmin,
+						xlrec->restart_lsn, xlrec->restart_tli);
+
+				break;
+			}
+		case XLOG_REPLSLOT_DROP:
+			{
+				xl_replslot_drop *xlrec;
+
+				xlrec = (xl_replslot_drop *) rec;
+
+				appendStringInfo(buf, "slot %s", NameStr(xlrec->name));
+
+				break;
+			}
+	}
+}
+
+const char *
+replslot_identify(uint8 info)
+{
+	switch (info)
+	{
+		case XLOG_REPLSLOT_UPDATE:
+			return "CREATE_OR_UPDATE";
+		case XLOG_REPLSLOT_DROP:
+			return "DROP";
+		default:
+			return NULL;
+	}
+}
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 7c4d773..0bd5796 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -24,6 +24,7 @@
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
 #include "replication/origin.h"
+#include "replication/slot_xlog.h"
 #include "storage/standby.h"
 #include "utils/relmapper.h"
 
diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c
index c1c0223..61fc45b 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -2114,6 +2114,9 @@ dbase_redo(XLogReaderState *record)
 		/* Clean out the xlog relcache too */
 		XLogDropDatabase(xlrec->db_id);
 
+		/* Drop any logical failover slots for this database */
+		ReplicationSlotsDropDBSlots(xlrec->db_id);
+
 		/* And remove the physical files */
 		if (!rmtree(dst_path, true))
 			ereport(WARNING,
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index af0fb09..ab1f271 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -973,18 +973,6 @@ sendDir(char *path, int basepathlen, bool sizeonly, List *tablespaces,
 		}
 
 		/*
-		 * Skip pg_replslot, not useful to copy. But include it as an empty
-		 * directory anyway, so we get permissions right.
-		 */
-		if (strcmp(de->d_name, "pg_replslot") == 0)
-		{
-			if (!sizeonly)
-				_tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf);
-			size += 512;		/* Size of the header just added */
-			continue;
-		}
-
-		/*
 		 * We can skip pg_xlog, the WAL segments need to be fetched from the
 		 * WAL archive anyway. But include it as an empty directory anyway, so
 		 * we get permissions right.
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 88c3a49..76fc5c7 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -135,6 +135,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 		case RM_BRIN_ID:
 		case RM_COMMIT_TS_ID:
 		case RM_REPLORIGIN_ID:
+		case RM_REPLSLOT_ID:
 			break;
 		case RM_NEXT_ID:
 			elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record));
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 2e6d3f9..37ffa82 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -85,16 +85,19 @@ CheckLogicalDecodingRequirements(void)
 				 errmsg("logical decoding requires a database connection")));
 
 	/* ----
-	 * TODO: We got to change that someday soon...
+	 * TODO: Allow logical decoding from a standby
 	 *
-	 * There's basically three things missing to allow this:
+	 * There's some things missing to allow this:
 	 * 1) We need to be able to correctly and quickly identify the timeline a
-	 *	  LSN belongs to
-	 * 2) We need to force hot_standby_feedback to be enabled at all times so
-	 *	  the primary cannot remove rows we need.
-	 * 3) support dropping replication slots referring to a database, in
-	 *	  dbase_redo. There can't be any active ones due to HS recovery
-	 *	  conflicts, so that should be relatively easy.
+	 *    LSN belongs to
+	 * 2) To prevent needed rows from being removed we need we would need
+	 *    to enhance hot_standby_feedback so it sends both xmin and
+	 *    catalog_xmin to the master.  A standby slot can't write WAL, so we
+	 *    wouldn't be able to use it directly for failover, without some very
+	 *    complex state interactions via master.
+	 *
+	 * So this doesn't seem likely to change anytime soon.
+	 *
 	 * ----
 	 */
 	if (RecoveryInProgress())
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index c12e412..ce278dd 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -26,6 +26,17 @@
  * While the server is running, the state data is also cached in memory for
  * efficiency.
  *
+ * Any slot created on a master node generates WAL records that maintain a copy
+ * of the slot on standby nodes. If a standby node is promoted the failover
+ * slot allows access to be restarted just as if the the original master node
+ * was being accessed, allowing for the timeline change. The replica considers
+ * slot positions when removing WAL to make sure it can satisfy the needs of
+ * slots after promotion. For logical decoding slots the slot's internal state
+ * is kept up to date so it's ready for use after promotion.
+ *
+ * Since replication slots cannot be created on a standby there's no risk of
+ * name collision from slot creation on the master.
+ *
  * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
  * or free a slot. ReplicationSlotControlLock must be taken in shared mode
  * to iterate over the slots, and in exclusive mode to change the in_use flag
@@ -44,6 +55,7 @@
 #include "common/string.h"
 #include "miscadmin.h"
 #include "replication/slot.h"
+#include "replication/slot_xlog.h"
 #include "storage/fd.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
@@ -104,6 +116,10 @@ static void RestoreSlotFromDisk(const char *name);
 static void CreateSlotOnDisk(ReplicationSlot *slot);
 static void SaveSlotToPath(ReplicationSlot *slot, const char *path, int elevel);
 
+/* internal redo functions */
+static void ReplicationSlotRedoCreateOrUpdate(ReplicationSlotInWAL xlrec);
+static void ReplicationSlotRedoDrop(const char * slotname);
+
 /*
  * Report shared-memory space needed by ReplicationSlotShmemInit.
  */
@@ -265,11 +281,21 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	Assert(!slot->in_use);
 	Assert(slot->active_pid == 0);
 	slot->data.persistency = persistency;
+
+	elog(LOG, "persistency is %i", (int)slot->data.persistency);
+
 	slot->data.xmin = InvalidTransactionId;
 	slot->effective_xmin = InvalidTransactionId;
 	StrNCpy(NameStr(slot->data.name), name, NAMEDATALEN);
 	slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
 	slot->data.restart_lsn = InvalidXLogRecPtr;
+	/*
+	 * TODO: control over whether a slot is a failover slot.
+	 *
+	 * For now make them all failover if created on the master. Which
+	 * is all slots, since you can't make one on a replica.
+	 */
+	slot->data.failover = !RecoveryInProgress();
 
 	/*
 	 * Create the slot on disk.  We haven't actually marked the slot allocated
@@ -305,6 +331,10 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 
 /*
  * Find a previously created slot and mark it as used by this backend.
+ *
+ * Sets active_pid and assigns MyReplicationSlot iff successfully acquired.
+ *
+ * ERRORs on an attempt to acquire a failover slot when in recovery.
  */
 void
 ReplicationSlotAcquire(const char *name)
@@ -327,7 +357,11 @@ ReplicationSlotAcquire(const char *name)
 		{
 			SpinLockAcquire(&s->mutex);
 			active_pid = s->active_pid;
-			if (active_pid == 0)
+			/*
+			 * We can only claim a slot for our use if it's not claimed
+			 * by someone else AND it isn't a failover slot on a standby.
+			 */
+			if (active_pid == 0 && !(RecoveryInProgress() && slot->data.failover))
 				s->active_pid = MyProcPid;
 			SpinLockRelease(&s->mutex);
 			slot = s;
@@ -341,12 +375,24 @@ ReplicationSlotAcquire(const char *name)
 		ereport(ERROR,
 				(errcode(ERRCODE_UNDEFINED_OBJECT),
 				 errmsg("replication slot \"%s\" does not exist", name)));
+
 	if (active_pid != 0)
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_IN_USE),
 			   errmsg("replication slot \"%s\" is already active for PID %d",
 					  name, active_pid)));
 
+	/*
+	 * An attempt to use a failover slot from a standby must fail since
+	 * we can't write WAL from a standby and there's no sensible way
+	 * to advance slot position from both replica and master anyway.
+	 */
+	if (RecoveryInProgress() && slot->data.failover)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_IN_USE),
+				 errmsg("replication slot \"%s\" is reserved for use after failover",
+					  name)));
+
 	/* We made this slot active, so it's ours now. */
 	MyReplicationSlot = slot;
 }
@@ -403,16 +449,23 @@ ReplicationSlotDrop(const char *name)
 /*
  * Permanently drop the currently acquired replication slot which will be
  * released by the point this function returns.
+ *
+ * Callers must NOT hold ReplicationSlotControlLock in SHARED mode.  EXCLUSIVE
+ * is OK, or not held at all.
  */
 static void
-ReplicationSlotDropAcquired(void)
+ReplicationSlotDropAcquired()
 {
 	char		path[MAXPGPATH];
 	char		tmppath[MAXPGPATH];
 	ReplicationSlot *slot = MyReplicationSlot;
+	bool slot_is_failover;
+	bool took_control_lock = false;
 
 	Assert(MyReplicationSlot != NULL);
 
+	slot_is_failover = slot->data.failover;
+
 	/* slot isn't acquired anymore */
 	MyReplicationSlot = NULL;
 
@@ -423,6 +476,18 @@ ReplicationSlotDropAcquired(void)
 	 */
 	LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
 
+	/* Record the drop in XLOG if we aren't replaying WAL */
+	if (XLogInsertAllowed() && slot_is_failover)
+	{
+		xl_replslot_drop xlrec;
+
+		memcpy(&(xlrec.name), NameStr(slot->data.name), sizeof(NAMEDATALEN));
+
+		XLogBeginInsert();
+		XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
+		(void) XLogInsert(RM_REPLSLOT_ID, XLOG_REPLSLOT_DROP);
+	}
+
 	/* Generate pathnames. */
 	sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
 	sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
@@ -451,7 +516,11 @@ ReplicationSlotDropAcquired(void)
 	}
 	else
 	{
-		bool		fail_softly = slot->data.persistency == RS_EPHEMERAL;
+		bool		fail_softly = false;
+
+		if (RecoveryInProgress() ||
+			slot->data.persistency == RS_EPHEMERAL)
+			fail_softly = true;
 
 		SpinLockAcquire(&slot->mutex);
 		slot->active_pid = 0;
@@ -469,11 +538,20 @@ ReplicationSlotDropAcquired(void)
 	 * grabbing the mutex because nobody else can be scanning the array here,
 	 * and nobody can be attached to this slot and thus access it without
 	 * scanning the array.
+	 *
+	 * You must hold the lock in EXCLUSIVE mode or not at all.
 	 */
-	LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
+	if (!LWLockHeldByMe(ReplicationSlotControlLock))
+	{
+		took_control_lock = true;
+		LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
+	}
+
 	slot->active_pid = 0;
 	slot->in_use = false;
-	LWLockRelease(ReplicationSlotControlLock);
+
+	if (took_control_lock)
+		LWLockRelease(ReplicationSlotControlLock);
 
 	/*
 	 * Slot is dead and doesn't prevent resource removal anymore, recompute
@@ -536,6 +614,9 @@ ReplicationSlotMarkDirty(void)
 /*
  * Convert a slot that's marked as RS_EPHEMERAL to a RS_PERSISTENT slot,
  * guaranteeing it will be there after an eventual crash.
+ *
+ * Failover slots will emit a create xlog record at this time, having
+ * not been previously written to xlog.
  */
 void
 ReplicationSlotPersist(void)
@@ -739,6 +820,45 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
 	return false;
 }
 
+void
+ReplicationSlotsDropDBSlots(Oid dboid)
+{
+	int			i;
+
+	Assert(MyReplicationSlot == NULL);
+
+	LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+		if (s->data.database == dboid)
+		{
+			/*
+			 * There should be no connections to this dbid
+			 * therefore all slots for this dbid should be
+			 * logical, inactive failover slots.
+			 */
+			Assert(s->active_pid == 0);
+			Assert(s->in_use == false);
+			Assert(SlotIsLogical(s));
+
+			/*
+			 * Acquire the replication slot
+			 */
+			MyReplicationSlot = s;
+
+			/*
+			 * No need to deactivate slot, especially since we
+			 * already hold ReplicationSlotControlLock.
+			 */
+			ReplicationSlotDropAcquired();
+		}
+	}
+	LWLockRelease(ReplicationSlotControlLock);
+
+	MyReplicationSlot = NULL;
+}
 
 /*
  * Check whether the server's configuration supports using replication
@@ -988,6 +1108,8 @@ CreateSlotOnDisk(ReplicationSlot *slot)
 
 /*
  * Shared functionality between saving and creating a replication slot.
+ *
+ * For failover slots this is where we emit xlog.
  */
 static void
 SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
@@ -998,15 +1120,18 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
 	ReplicationSlotOnDisk cp;
 	bool		was_dirty;
 
-	/* first check whether there's something to write out */
-	SpinLockAcquire(&slot->mutex);
-	was_dirty = slot->dirty;
-	slot->just_dirtied = false;
-	SpinLockRelease(&slot->mutex);
+	if (!RecoveryInProgress())
+	{
+		/* first check whether there's something to write out */
+		SpinLockAcquire(&slot->mutex);
+		was_dirty = slot->dirty;
+		slot->just_dirtied = false;
+		SpinLockRelease(&slot->mutex);
 
-	/* and don't do anything if there's nothing to write */
-	if (!was_dirty)
-		return;
+		/* and don't do anything if there's nothing to write */
+		if (!was_dirty)
+			return;
+	}
 
 	LWLockAcquire(slot->io_in_progress_lock, LW_EXCLUSIVE);
 
@@ -1039,6 +1164,25 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
 
 	SpinLockRelease(&slot->mutex);
 
+	/*
+	 * If needed, record this action in WAL
+	 */
+	if (slot->data.failover &&
+		slot->data.persistency == RS_PERSISTENT &&
+		!RecoveryInProgress())
+	{
+		XLogBeginInsert();
+		XLogRegisterData((char *) (&cp.slotdata), sizeof(ReplicationSlotPersistentData));
+		/*
+		 * Note that slot creation on the downstream is also an "update".
+		 *
+		 * Slots can start off ephemeral and be updated to persistent. We just
+		 * log the update and the downstream creates the new slot if it doesn't
+		 * exist yet.
+		 */
+		(void) XLogInsert(RM_REPLSLOT_ID, XLOG_REPLSLOT_UPDATE);
+	}
+
 	COMP_CRC32C(cp.checksum,
 				(char *) (&cp) + SnapBuildOnDiskNotChecksummedSize,
 				SnapBuildOnDiskChecksummedSize);
@@ -1279,3 +1423,266 @@ RestoreSlotFromDisk(const char *name)
 				(errmsg("too many replication slots active before shutdown"),
 				 errhint("Increase max_replication_slots and try again.")));
 }
+
+/*
+ * This usually just writes new persistent data to the slot state, but an
+ * update record might create a new slot on the downstream if we changed a
+ * previously ephemeral slot to persistent. We have to decide which
+ * by looking for the existing slot.
+ */
+static void
+ReplicationSlotRedoCreateOrUpdate(ReplicationSlotInWAL xlrec)
+{
+	ReplicationSlot *slot;
+	bool	found_available = false;
+	bool	found_duplicate = false;
+	int		use_slotid = 0;
+	int		i;
+
+	/*
+	 * Find the slot if it exists, or the first free entry
+	 * to write it to otherwise. Also handle the case where
+	 * the slot exists on the downstream as a non-failover
+	 * slot with a clashing name.
+	 *
+	 * We're in redo, but someone could still create an ephemeral
+	 * slot and race with us unless we take the allocation lock.
+	 */
+	LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		slot = &ReplicationSlotCtl->replication_slots[i];
+
+		/*
+		 * Find first unused position in the slots array, but keep on
+		 * scanning...
+		 */
+		if (!slot->in_use && !found_available)
+		{
+			use_slotid = i;
+			found_available = true;
+		}
+
+		/*
+		 * Keep looking for an existing slot with the same name. It could be
+		 * our failover slot to update or a non-failover slot with a
+		 * conflicting name.
+		 */
+		if (strcmp(NameStr(xlrec->name), NameStr(slot->data.name)) == 0)
+		{
+			use_slotid = i;
+			found_available = true;
+			found_duplicate = true;
+			break;
+		}
+	}
+
+	if (found_duplicate && !slot->data.failover)
+	{
+		/*
+		 * TODO.
+		 *
+		 * A name clash with the incoming failover slot may occur when
+		 * a non-failover slot was created locally on a replica or when
+		 * redo failed partway through on a failover slot.
+		 *
+		 * For conflicting local slots We handle this by aborting any
+		 * connection using the slot with a conflict in recovery error then
+		 * removing the local non-failover slot. The replacement slot won't
+		 * allow replay until promotion so if the old client reconnects it
+		 * won't be able to make a mess by advancing the failover slot.
+		 *
+		 * 'conflict with recovery' aborts are only done for regular backends
+		 * so we'll have to send a cancel.
+		 *
+		 * We could race against the client, which might be able to restart
+		 * and re-acquire the slot before we can. Unlikely, but not impossible.
+		 * So we have to cope with the slot still being in_use when we
+		 * look at it after killing its client.
+		 *
+		 * This is all a bit complicated so in the WIP patch just ERROR here,
+		 * letting the user clean up the mess by dropping the conflicting slot.
+		 *
+		 * For ephemeral slots this is a no-brainer, but it's less pretty for
+		 * persistent downstream slots.
+		 */
+
+		LWLockRelease(ReplicationSlotAllocationLock);
+
+		ereport(ERROR,
+				(errcode(ERRCODE_DUPLICATE_OBJECT),
+				 errmsg("A local non-failover slot with the name %s already exists",
+					 NameStr(xlrec->name)),
+				 errdetail("While replaying the creation of a failover slot from the "
+						   "master an existing non-failover slot with the same name "
+						   "was found on the replica. Replay cannot continue until "
+						   "the conflicting slot is dropped on the replica. ")));
+
+		// not continuing, but we should hold this lock if we do
+		// continue...
+		//LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
+	}
+
+	/*
+	 * This is either an empty slot control position to make a new slot or it's
+	 * an existing entry for this failover slot that we need to update. Most of
+	 * the logic is the same.
+	 */
+	if (found_available)
+	{
+		LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
+
+		slot = &ReplicationSlotCtl->replication_slots[use_slotid];
+
+		/* restore the entire set of persistent data */
+		memcpy(&slot->data, xlrec,
+			   sizeof(ReplicationSlotPersistentData));
+
+		Assert(strcmp(NameStr(xlrec->name), NameStr(slot->data.name)) == 0);
+		Assert(slot->data.failover && slot->data.persistency == RS_PERSISTENT);
+
+		/* Update the non-persistent in-memory state */
+		slot->effective_xmin = xlrec->xmin;
+		slot->effective_catalog_xmin = xlrec->catalog_xmin;
+
+		if (found_duplicate)
+		{
+			char		path[MAXPGPATH];
+
+			elog(DEBUG1, "Updating existing slot %s", NameStr(slot->data.name));
+
+			/* Write an existing slot to disk */
+			Assert(slot->in_use);
+			Assert(slot->active_pid == 0); /* can't be replaying from failover slot */
+
+			sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
+			slot->dirty = true;
+			SaveSlotToPath(slot, path, ERROR);
+		}
+		else
+		{
+			elog(DEBUG1, "Creating slot %s", NameStr(slot->data.name));
+
+			/* In-memory state that's only set on create, not update */
+			slot->active_pid = 0;
+			slot->in_use = true;
+			slot->candidate_catalog_xmin = InvalidTransactionId;
+			slot->candidate_xmin_lsn = InvalidXLogRecPtr;
+			slot->candidate_restart_lsn = InvalidXLogRecPtr;
+			slot->candidate_restart_valid = InvalidXLogRecPtr;
+
+			CreateSlotOnDisk(slot);
+		}
+
+		LWLockRelease(ReplicationSlotControlLock);
+	}
+
+	LWLockRelease(ReplicationSlotAllocationLock);
+
+	if (!found_available)
+	{
+		/*
+		 * Because the standby should have the same or greater max_replication_slots
+		 * as the master this shouldn't happen, but just in case...
+		 */
+		ereport(ERROR,
+				(errmsg("max_replication_slots exceeded, cannot replay failover slot creation"),
+				 errhint("Increase max_replication_slots")));
+	}
+}
+
+/*
+ * Redo a slot drop of a failover slot. This might be a redo during crash
+ * recovery on the master or it may be replay on a standby.
+ */
+static void
+ReplicationSlotRedoDrop(const char * slotname)
+{
+	/*
+	 * Acquire the failover slot that's to be dropped.
+	 *
+	 * We can't ReplicationSlotAcquire here because we want to acquire
+	 * a replication slot during replay, which isn't usually allowed.
+	 * Also, because we might crash midway through a drop we can't
+	 * assume we'll actually find the slot so it's not an error for
+	 * the slot to be missing.
+	 */
+	int			i;
+
+	Assert(MyReplicationSlot == NULL);
+
+	ReplicationSlotValidateName(slotname, ERROR);
+
+	/*
+	 * Search for the named failover slot and mark it active if we
+	 * find it.
+	 */
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+		if (s->in_use && strcmp(slotname, NameStr(s->data.name)) == 0)
+		{
+			if (!s->data.persistency == RS_PERSISTENT)
+			{
+				/* shouldn't happen */
+				elog(WARNING, "BUG: found conflicting non-persistent slot during failover slot drop");
+				break;
+			}
+
+			if (!s->data.failover)
+			{
+				/* shouldn't happen */
+				elog(WARNING, "BUG: found non-failover slot during redo of slot drop");
+				break;
+			}
+
+			/* A failover slot can't be active during recovery */
+			Assert(s->active_pid == 0);
+
+			/* Claim the slot */
+			s->active_pid = MyProcPid;
+			MyReplicationSlot = s;
+
+			break;
+		}
+	}
+	LWLockRelease(ReplicationSlotControlLock);
+
+	if (MyReplicationSlot != NULL)
+		ReplicationSlotDropAcquired();
+}
+
+void
+replslot_redo(XLogReaderState *record)
+{
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	switch (info)
+	{
+		/*
+		 * Update the values for an existing failover slot or, when a slot
+		 * is first logged as persistent, create it on the downstream.
+		 */
+		case XLOG_REPLSLOT_UPDATE:
+			ReplicationSlotRedoCreateOrUpdate((ReplicationSlotInWAL) XLogRecGetData(record));
+			break;
+
+		/*
+		 * Drop an existing failover slot.
+		 */
+		case XLOG_REPLSLOT_DROP:
+			{
+				xl_replslot_drop *xlrec =
+				(xl_replslot_drop *) XLogRecGetData(record);
+
+				ReplicationSlotRedoDrop(NameStr(xlrec->name));
+
+				break;
+			}
+
+		default:
+			elog(PANIC, "replslot_redo: unknown op code %u", info);
+	}
+}
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 9cc24ea..e90d079 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -18,6 +18,7 @@
 
 #include "access/htup_details.h"
 #include "replication/slot.h"
+#include "replication/slot_xlog.h"
 #include "replication/logical.h"
 #include "replication/logicalfuncs.h"
 #include "utils/builtins.h"
diff --git a/src/bin/pg_xlogdump/replslotdesc.c b/src/bin/pg_xlogdump/replslotdesc.c
new file mode 120000
index 0000000..2e088d2
--- /dev/null
+++ b/src/bin/pg_xlogdump/replslotdesc.c
@@ -0,0 +1 @@
+../../../src/backend/access/rmgrdesc/replslotdesc.c
\ No newline at end of file
diff --git a/src/bin/pg_xlogdump/rmgrdesc.c b/src/bin/pg_xlogdump/rmgrdesc.c
index f9cd395..73ed7d4 100644
--- a/src/bin/pg_xlogdump/rmgrdesc.c
+++ b/src/bin/pg_xlogdump/rmgrdesc.c
@@ -26,6 +26,7 @@
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
 #include "replication/origin.h"
+#include "replication/slot_xlog.h"
 #include "rmgrdesc.h"
 #include "storage/standbydefs.h"
 #include "utils/relmapper.h"
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index fab912d..124b7e5 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -45,3 +45,4 @@ PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_start
 PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL)
 PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL)
 PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL)
+PG_RMGR(RM_REPLSLOT_ID, "ReplicationSlot", replslot_redo, replslot_desc, replslot_identify, NULL, NULL)
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 80ad02a..cb35181 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -4,6 +4,7 @@
  *
  * Copyright (c) 2012-2016, PostgreSQL Global Development Group
  *
+ * src/include/replication/slot.h
  *-------------------------------------------------------------------------
  */
 #ifndef SLOT_H
@@ -11,69 +12,12 @@
 
 #include "fmgr.h"
 #include "access/xlog.h"
-#include "access/xlogreader.h"
+#include "replication/slot_xlog.h"
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
 #include "storage/spin.h"
 
 /*
- * Behaviour of replication slots, upon release or crash.
- *
- * Slots marked as PERSISTENT are crashsafe and will not be dropped when
- * released. Slots marked as EPHEMERAL will be dropped when released or after
- * restarts.
- *
- * EPHEMERAL slots can be made PERSISTENT by calling ReplicationSlotPersist().
- */
-typedef enum ReplicationSlotPersistency
-{
-	RS_PERSISTENT,
-	RS_EPHEMERAL
-} ReplicationSlotPersistency;
-
-/*
- * On-Disk data of a replication slot, preserved across restarts.
- */
-typedef struct ReplicationSlotPersistentData
-{
-	/* The slot's identifier */
-	NameData	name;
-
-	/* database the slot is active on */
-	Oid			database;
-
-	/*
-	 * The slot's behaviour when being dropped (or restored after a crash).
-	 */
-	ReplicationSlotPersistency persistency;
-
-	/*
-	 * xmin horizon for data
-	 *
-	 * NB: This may represent a value that hasn't been written to disk yet;
-	 * see notes for effective_xmin, below.
-	 */
-	TransactionId xmin;
-
-	/*
-	 * xmin horizon for catalog tuples
-	 *
-	 * NB: This may represent a value that hasn't been written to disk yet;
-	 * see notes for effective_xmin, below.
-	 */
-	TransactionId catalog_xmin;
-
-	/* oldest LSN that might be required by this replication slot */
-	XLogRecPtr	restart_lsn;
-
-	/* oldest LSN that the client has acked receipt for */
-	XLogRecPtr	confirmed_flush;
-
-	/* plugin name */
-	NameData	plugin;
-} ReplicationSlotPersistentData;
-
-/*
  * Shared memory state of a single replication slot.
  */
 typedef struct ReplicationSlot
@@ -171,6 +115,7 @@ extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
 extern void ReplicationSlotsComputeRequiredLSN(void);
 extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
+extern void ReplicationSlotsDropDBSlots(Oid dboid);
 
 extern void StartupReplicationSlots(void);
 extern void CheckPointReplicationSlots(void);
diff --git a/src/include/replication/slot_xlog.h b/src/include/replication/slot_xlog.h
new file mode 100644
index 0000000..7caf009
--- /dev/null
+++ b/src/include/replication/slot_xlog.h
@@ -0,0 +1,103 @@
+/*-------------------------------------------------------------------------
+ * slot_xlog.h
+ *	   Replication slot management.
+ *
+ * Copyright (c) 2012-2015, PostgreSQL Global Development Group
+ *
+ * src/include/replication/slot_xlog.h
+ *-------------------------------------------------------------------------
+ */
+#ifndef SLOT_XLOG_H
+#define SLOT_XLOG_H
+
+#include "fmgr.h"
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "access/xlogreader.h"
+
+/*
+ * Behaviour of replication slots, upon release or crash.
+ *
+ * Slots marked as PERSISTENT are crashsafe and will not be dropped when
+ * released. Slots marked as EPHEMERAL will be dropped when released or after
+ * restarts.
+ *
+ * EPHEMERAL slots can be made PERSISTENT by calling ReplicationSlotPersist().
+ */
+typedef enum ReplicationSlotPersistency
+{
+	RS_PERSISTENT,
+	RS_EPHEMERAL
+} ReplicationSlotPersistency;
+
+/*
+ * On-Disk data of a replication slot, preserved across restarts.
+ */
+typedef struct ReplicationSlotPersistentData
+{
+	/* The slot's identifier */
+	NameData	name;
+
+	/* database the slot is active on */
+	Oid			database;
+
+	/*
+	 * The slot's behaviour when being dropped (or restored after a crash).
+	 */
+	ReplicationSlotPersistency persistency;
+
+	/*
+	 * Slots created on master become failover-slots and are maintained
+	 * on all standbys, but are only assignable after failover.
+	 */
+	bool		failover;
+
+	/*
+	 * xmin horizon for data
+	 *
+	 * NB: This may represent a value that hasn't been written to disk yet;
+	 * see notes for effective_xmin, below.
+	 */
+	TransactionId xmin;
+
+	/*
+	 * xmin horizon for catalog tuples
+	 *
+	 * NB: This may represent a value that hasn't been written to disk yet;
+	 * see notes for effective_xmin, below.
+	 */
+	TransactionId catalog_xmin;
+
+	/* oldest LSN that might be required by this replication slot */
+	XLogRecPtr	restart_lsn;
+	TimeLineID	restart_tli;
+
+	/* oldest LSN that the client has acked receipt for */
+	XLogRecPtr	confirmed_flush;
+
+	/* plugin name */
+	NameData	plugin;
+} ReplicationSlotPersistentData;
+
+typedef ReplicationSlotPersistentData *ReplicationSlotInWAL;
+
+/*
+ * WAL records for failover slots
+ *
+ * Note that the low 4 bits are reserved by the system. The high 4 bits are for
+ * rmgr use.
+ */
+#define XLOG_REPLSLOT_UPDATE	0x10
+#define XLOG_REPLSLOT_DROP		0x20
+
+typedef struct xl_replslot_drop
+{
+	NameData	name;
+} xl_replslot_drop;
+
+/* WAL logging */
+extern void replslot_redo(XLogReaderState *record);
+extern void replslot_desc(StringInfo buf, XLogReaderState *record);
+extern const char *replslot_identify(uint8 info);
+
+#endif   /* SLOT_XLOG_H */
-- 
2.1.0

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to