From a508f1c38ff689ec5a8d9df371fd941d547fa479 Mon Sep 17 00:00:00 2001
From: Amit Khandekar <amit.khandekar@enterprisedb.com>
Date: Wed, 3 Apr 2019 19:26:49 +0530
Subject: [PATCH] Logical decoding on standby.

-Andres Freund.

Besides the above main changes by Andres, following changes done by
Amit Khandekar :

1. Handle slot conflict recovery by dropping the conflicting slots.

2. test/recovery/t/016_logical_decoding_on_replica.pl added.
This test is originally written by Craig Ringer, with some changes
from Amit Khandekar. Still in WIP state. Yet to add scenarios to test
conflict recovery.

Incremental changes in v3 : Added recovery handling scenario.

---
 src/backend/access/gist/gistxlog.c                 |   6 +-
 src/backend/access/hash/hash_xlog.c                |   3 +-
 src/backend/access/hash/hashinsert.c               |   2 +
 src/backend/access/heap/heapam.c                   |  23 +-
 src/backend/access/heap/vacuumlazy.c               |   2 +-
 src/backend/access/heap/visibilitymap.c            |   2 +-
 src/backend/access/nbtree/nbtpage.c                |   3 +
 src/backend/access/nbtree/nbtxlog.c                |   4 +-
 src/backend/access/spgist/spgvacuum.c              |   2 +
 src/backend/access/spgist/spgxlog.c                |   1 +
 src/backend/replication/logical/logical.c          |   2 +
 src/backend/replication/slot.c                     |  79 +++++
 src/backend/storage/ipc/standby.c                  |   7 +-
 src/backend/utils/cache/lsyscache.c                |  16 +
 src/include/access/gistxlog.h                      |   3 +-
 src/include/access/hash_xlog.h                     |   1 +
 src/include/access/heapam_xlog.h                   |   8 +-
 src/include/access/nbtxlog.h                       |   2 +
 src/include/access/spgxlog.h                       |   1 +
 src/include/replication/slot.h                     |   2 +
 src/include/storage/standby.h                      |   2 +-
 src/include/utils/lsyscache.h                      |   1 +
 src/include/utils/rel.h                            |   1 +
 .../recovery/t/016_logical_decoding_on_replica.pl  | 358 +++++++++++++++++++++
 24 files changed, 513 insertions(+), 18 deletions(-)
 create mode 100644 src/test/recovery/t/016_logical_decoding_on_replica.pl

diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
index 4fb1855..59a7910 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -342,7 +342,8 @@ gistRedoDeleteRecord(XLogReaderState *record)
 
 		XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
 
-		ResolveRecoveryConflictWithSnapshot(latestRemovedXid, rnode);
+		ResolveRecoveryConflictWithSnapshot(latestRemovedXid,
+											xldata->onCatalogTable, rnode);
 	}
 
 	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
@@ -544,7 +545,7 @@ gistRedoPageReuse(XLogReaderState *record)
 	if (InHotStandby)
 	{
 		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
-											xlrec->node);
+											xlrec->onCatalogTable, xlrec->node);
 	}
 }
 
@@ -736,6 +737,7 @@ gistXLogPageReuse(Relation rel, BlockNumber blkno, TransactionId latestRemovedXi
 	 */
 
 	/* XLOG stuff */
+	xlrec_reuse.onCatalogTable = RelationIsAccessibleInLogicalDecoding(rel);
 	xlrec_reuse.node = rel->rd_node;
 	xlrec_reuse.block = blkno;
 	xlrec_reuse.latestRemovedXid = latestRemovedXid;
diff --git a/src/backend/access/hash/hash_xlog.c b/src/backend/access/hash/hash_xlog.c
index d7b7098..00c3e0f 100644
--- a/src/backend/access/hash/hash_xlog.c
+++ b/src/backend/access/hash/hash_xlog.c
@@ -1002,7 +1002,8 @@ hash_xlog_vacuum_one_page(XLogReaderState *record)
 		RelFileNode rnode;
 
 		XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
-		ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, rnode);
+		ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid,
+											xldata->onCatalogTable, rnode);
 	}
 
 	action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer);
diff --git a/src/backend/access/hash/hashinsert.c b/src/backend/access/hash/hashinsert.c
index e17f017..b67e4e6 100644
--- a/src/backend/access/hash/hashinsert.c
+++ b/src/backend/access/hash/hashinsert.c
@@ -17,6 +17,7 @@
 
 #include "access/hash.h"
 #include "access/hash_xlog.h"
+#include "catalog/catalog.h"
 #include "miscadmin.h"
 #include "utils/rel.h"
 #include "storage/lwlock.h"
@@ -398,6 +399,7 @@ _hash_vacuum_one_page(Relation rel, Relation hrel, Buffer metabuf, Buffer buf)
 			xl_hash_vacuum_one_page xlrec;
 			XLogRecPtr	recptr;
 
+			xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(hrel);
 			xlrec.latestRemovedXid = latestRemovedXid;
 			xlrec.ntuples = ndeletable;
 
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index a05b6a0..bfbb9d3 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -7100,12 +7100,13 @@ heap_compute_xid_horizon_for_tuples(Relation rel,
  * see comments for vacuum_log_cleanup_info().
  */
 XLogRecPtr
-log_heap_cleanup_info(RelFileNode rnode, TransactionId latestRemovedXid)
+log_heap_cleanup_info(Relation rel, TransactionId latestRemovedXid)
 {
 	xl_heap_cleanup_info xlrec;
 	XLogRecPtr	recptr;
 
-	xlrec.node = rnode;
+	xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(rel);
+	xlrec.node = rel->rd_node;
 	xlrec.latestRemovedXid = latestRemovedXid;
 
 	XLogBeginInsert();
@@ -7141,6 +7142,7 @@ log_heap_clean(Relation reln, Buffer buffer,
 	/* Caller should not call me on a non-WAL-logged relation */
 	Assert(RelationNeedsWAL(reln));
 
+	xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(reln);
 	xlrec.latestRemovedXid = latestRemovedXid;
 	xlrec.nredirected = nredirected;
 	xlrec.ndead = ndead;
@@ -7191,6 +7193,7 @@ log_heap_freeze(Relation reln, Buffer buffer, TransactionId cutoff_xid,
 	/* nor when there are no tuples to freeze */
 	Assert(ntuples > 0);
 
+	xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(reln);
 	xlrec.cutoff_xid = cutoff_xid;
 	xlrec.ntuples = ntuples;
 
@@ -7221,7 +7224,7 @@ log_heap_freeze(Relation reln, Buffer buffer, TransactionId cutoff_xid,
  * heap_buffer, if necessary.
  */
 XLogRecPtr
-log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer,
+log_heap_visible(Relation rel, Buffer heap_buffer, Buffer vm_buffer,
 				 TransactionId cutoff_xid, uint8 vmflags)
 {
 	xl_heap_visible xlrec;
@@ -7231,6 +7234,7 @@ log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer,
 	Assert(BufferIsValid(heap_buffer));
 	Assert(BufferIsValid(vm_buffer));
 
+	xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(rel);
 	xlrec.cutoff_xid = cutoff_xid;
 	xlrec.flags = vmflags;
 	XLogBeginInsert();
@@ -7651,7 +7655,8 @@ heap_xlog_cleanup_info(XLogReaderState *record)
 	xl_heap_cleanup_info *xlrec = (xl_heap_cleanup_info *) XLogRecGetData(record);
 
 	if (InHotStandby)
-		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, xlrec->node);
+		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
+											xlrec->onCatalogTable, xlrec->node);
 
 	/*
 	 * Actual operation is a no-op. Record type exists to provide a means for
@@ -7687,7 +7692,8 @@ heap_xlog_clean(XLogReaderState *record)
 	 * latestRemovedXid is invalid, skip conflict processing.
 	 */
 	if (InHotStandby && TransactionIdIsValid(xlrec->latestRemovedXid))
-		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode);
+		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
+											xlrec->onCatalogTable, rnode);
 
 	/*
 	 * If we have a full-page image, restore it (using a cleanup lock) and
@@ -7783,7 +7789,9 @@ heap_xlog_visible(XLogReaderState *record)
 	 * rather than killing the transaction outright.
 	 */
 	if (InHotStandby)
-		ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid, rnode);
+		ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid,
+											xlrec->onCatalogTable,
+											rnode);
 
 	/*
 	 * Read the heap page, if it still exists. If the heap file has dropped or
@@ -7920,7 +7928,8 @@ heap_xlog_freeze_page(XLogReaderState *record)
 		TransactionIdRetreat(latestRemovedXid);
 
 		XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
-		ResolveRecoveryConflictWithSnapshot(latestRemovedXid, rnode);
+		ResolveRecoveryConflictWithSnapshot(latestRemovedXid,
+											xlrec->onCatalogTable, rnode);
 	}
 
 	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index c9d8312..fad08e0 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -475,7 +475,7 @@ vacuum_log_cleanup_info(Relation rel, LVRelStats *vacrelstats)
 	 * No need to write the record at all unless it contains a valid value
 	 */
 	if (TransactionIdIsValid(vacrelstats->latestRemovedXid))
-		(void) log_heap_cleanup_info(rel->rd_node, vacrelstats->latestRemovedXid);
+		(void) log_heap_cleanup_info(rel, vacrelstats->latestRemovedXid);
 }
 
 /*
diff --git a/src/backend/access/heap/visibilitymap.c b/src/backend/access/heap/visibilitymap.c
index 64dfe06..c5fdd64 100644
--- a/src/backend/access/heap/visibilitymap.c
+++ b/src/backend/access/heap/visibilitymap.c
@@ -281,7 +281,7 @@ visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf,
 			if (XLogRecPtrIsInvalid(recptr))
 			{
 				Assert(!InRecovery);
-				recptr = log_heap_visible(rel->rd_node, heapBuf, vmBuf,
+				recptr = log_heap_visible(rel, heapBuf, vmBuf,
 										  cutoff_xid, flags);
 
 				/*
diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c
index 8ade165..745cbc5 100644
--- a/src/backend/access/nbtree/nbtpage.c
+++ b/src/backend/access/nbtree/nbtpage.c
@@ -31,6 +31,7 @@
 #include "storage/indexfsm.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
+#include "utils/lsyscache.h"
 #include "utils/snapmgr.h"
 
 static void _bt_cachemetadata(Relation rel, BTMetaPageData *input);
@@ -773,6 +774,7 @@ _bt_log_reuse_page(Relation rel, BlockNumber blkno, TransactionId latestRemovedX
 	 */
 
 	/* XLOG stuff */
+	xlrec_reuse.onCatalogTable = get_rel_logical_catalog(rel->rd_index->indrelid);
 	xlrec_reuse.node = rel->rd_node;
 	xlrec_reuse.block = blkno;
 	xlrec_reuse.latestRemovedXid = latestRemovedXid;
@@ -1140,6 +1142,7 @@ _bt_delitems_delete(Relation rel, Buffer buf,
 		XLogRecPtr	recptr;
 		xl_btree_delete xlrec_delete;
 
+		xlrec_delete.onCatalogTable = get_rel_logical_catalog(rel->rd_index->indrelid);
 		xlrec_delete.latestRemovedXid = latestRemovedXid;
 		xlrec_delete.nitems = nitems;
 
diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c
index 0a85d8b..2617d55 100644
--- a/src/backend/access/nbtree/nbtxlog.c
+++ b/src/backend/access/nbtree/nbtxlog.c
@@ -526,7 +526,8 @@ btree_xlog_delete(XLogReaderState *record)
 
 		XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
 
-		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode);
+		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
+											xlrec->onCatalogTable, rnode);
 	}
 
 	/*
@@ -810,6 +811,7 @@ btree_xlog_reuse_page(XLogReaderState *record)
 	if (InHotStandby)
 	{
 		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
+											xlrec->onCatalogTable,
 											xlrec->node);
 	}
 }
diff --git a/src/backend/access/spgist/spgvacuum.c b/src/backend/access/spgist/spgvacuum.c
index b9311ce..ef4910f 100644
--- a/src/backend/access/spgist/spgvacuum.c
+++ b/src/backend/access/spgist/spgvacuum.c
@@ -27,6 +27,7 @@
 #include "storage/indexfsm.h"
 #include "storage/lmgr.h"
 #include "utils/snapmgr.h"
+#include "utils/lsyscache.h"
 
 
 /* Entry in pending-list of TIDs we need to revisit */
@@ -502,6 +503,7 @@ vacuumRedirectAndPlaceholder(Relation index, Buffer buffer)
 	OffsetNumber itemnos[MaxIndexTuplesPerPage];
 	spgxlogVacuumRedirect xlrec;
 
+	xlrec.onCatalogTable = get_rel_logical_catalog(index->rd_index->indrelid);
 	xlrec.nToPlaceholder = 0;
 	xlrec.newestRedirectXid = InvalidTransactionId;
 
diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c
index ebe6ae8..800609c 100644
--- a/src/backend/access/spgist/spgxlog.c
+++ b/src/backend/access/spgist/spgxlog.c
@@ -881,6 +881,7 @@ spgRedoVacuumRedirect(XLogReaderState *record)
 
 			XLogRecGetBlockTag(record, 0, &node, NULL, NULL);
 			ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid,
+												xldata->onCatalogTable,
 												node);
 		}
 	}
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 6e5bc12..e8b7af4 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -94,6 +94,7 @@ CheckLogicalDecodingRequirements(void)
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("logical decoding requires a database connection")));
 
+#ifdef NOT_ANYMORE
 	/* ----
 	 * TODO: We got to change that someday soon...
 	 *
@@ -111,6 +112,7 @@ CheckLogicalDecodingRequirements(void)
 		ereport(ERROR,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				 errmsg("logical decoding cannot be used while in recovery")));
+#endif
 }
 
 /*
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 006446b..5785d2f 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1064,6 +1064,85 @@ ReplicationSlotReserveWal(void)
 	}
 }
 
+void
+ResolveRecoveryConflictWithSlots(Oid dboid, TransactionId xid)
+{
+	int			i;
+	bool		found_conflict = false;
+
+	if (max_replication_slots <= 0)
+		return;
+
+restart:
+	if (found_conflict)
+	{
+		CHECK_FOR_INTERRUPTS();
+		/*
+		 * Wait awhile for them to die so that we avoid flooding an
+		 * unresponsive backend when system is heavily loaded.
+		 */
+		pg_usleep(100000);
+		found_conflict = false;
+	}
+
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *s;
+		NameData	slotname;
+		TransactionId slot_xmin;
+		TransactionId slot_catalog_xmin;
+
+		s = &ReplicationSlotCtl->replication_slots[i];
+
+		/* cannot change while ReplicationSlotCtlLock is held */
+		if (!s->in_use)
+			continue;
+
+		/* not our database, skip */
+		if (s->data.database != InvalidOid && s->data.database != dboid)
+			continue;
+
+		SpinLockAcquire(&s->mutex);
+		slotname = s->data.name;
+		slot_xmin = s->data.xmin;
+		slot_catalog_xmin = s->data.catalog_xmin;
+		SpinLockRelease(&s->mutex);
+
+		if (TransactionIdIsValid(slot_xmin) && TransactionIdPrecedesOrEquals(slot_xmin, xid))
+		{
+			found_conflict = true;
+
+			ereport(WARNING,
+					(errmsg("slot %s w/ xmin %u conflicts with removed xid %u",
+							NameStr(slotname), slot_xmin, xid)));
+		}
+
+		if (TransactionIdIsValid(slot_catalog_xmin) && TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid))
+		{
+			found_conflict = true;
+
+			ereport(WARNING,
+					(errmsg("slot %s w/ catalog xmin %u conflicts with removed xid %u",
+							NameStr(slotname), slot_catalog_xmin, xid)));
+		}
+
+
+		if (found_conflict)
+		{
+			elog(WARNING, "Dropping conflicting slot %s", s->data.name.data);
+			LWLockRelease(ReplicationSlotControlLock);	/* avoid deadlock */
+			ReplicationSlotDropPtr(s);
+
+			/* We released the lock above; so re-scan the slots. */
+			goto restart;
+		}
+	}
+
+	LWLockRelease(ReplicationSlotControlLock);
+}
+
+
 /*
  * Flush all replication slots to disk.
  *
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 215f146..75dbdb9 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -23,6 +23,7 @@
 #include "access/xloginsert.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/slot.h"
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
 #include "storage/proc.h"
@@ -291,7 +292,8 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
 }
 
 void
-ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode node)
+ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid,
+									bool onCatalogTable, RelFileNode node)
 {
 	VirtualTransactionId *backends;
 
@@ -312,6 +314,9 @@ ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode
 
 	ResolveRecoveryConflictWithVirtualXIDs(backends,
 										   PROCSIG_RECOVERY_CONFLICT_SNAPSHOT);
+
+	if (onCatalogTable)
+		ResolveRecoveryConflictWithSlots(node.dbNode, latestRemovedXid);
 }
 
 void
diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c
index 1089556..92a6ed1 100644
--- a/src/backend/utils/cache/lsyscache.c
+++ b/src/backend/utils/cache/lsyscache.c
@@ -18,7 +18,9 @@
 #include "access/hash.h"
 #include "access/htup_details.h"
 #include "access/nbtree.h"
+#include "access/table.h"
 #include "bootstrap/bootstrap.h"
+#include "catalog/catalog.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_am.h"
 #include "catalog/pg_amop.h"
@@ -1896,6 +1898,20 @@ get_rel_persistence(Oid relid)
 	return result;
 }
 
+bool
+get_rel_logical_catalog(Oid relid)
+{
+	bool	res;
+	Relation rel;
+
+	/* assume previously locked */
+	rel = heap_open(relid, NoLock);
+	res = RelationIsAccessibleInLogicalDecoding(rel);
+	heap_close(rel, NoLock);
+
+	return res;
+}
+
 
 /*				---------- TRANSFORM CACHE ----------						 */
 
diff --git a/src/include/access/gistxlog.h b/src/include/access/gistxlog.h
index 9990d97..887a377 100644
--- a/src/include/access/gistxlog.h
+++ b/src/include/access/gistxlog.h
@@ -47,10 +47,10 @@ typedef struct gistxlogPageUpdate
  */
 typedef struct gistxlogDelete
 {
+	bool		onCatalogTable;
 	RelFileNode hnode;			/* RelFileNode of the heap the index currently
 								 * points at */
 	uint16		ntodelete;		/* number of deleted offsets */
-
 	/*
 	 * In payload of blk 0 : todelete OffsetNumbers
 	 */
@@ -95,6 +95,7 @@ typedef struct gistxlogPageDelete
  */
 typedef struct gistxlogPageReuse
 {
+	bool		onCatalogTable;
 	RelFileNode node;
 	BlockNumber block;
 	TransactionId latestRemovedXid;
diff --git a/src/include/access/hash_xlog.h b/src/include/access/hash_xlog.h
index 53b682c..fd70b55 100644
--- a/src/include/access/hash_xlog.h
+++ b/src/include/access/hash_xlog.h
@@ -263,6 +263,7 @@ typedef struct xl_hash_init_bitmap_page
  */
 typedef struct xl_hash_vacuum_one_page
 {
+	bool		onCatalogTable;
 	TransactionId latestRemovedXid;
 	int			ntuples;
 
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 22cd13c..482c874 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -237,6 +237,7 @@ typedef struct xl_heap_update
  */
 typedef struct xl_heap_clean
 {
+	bool		onCatalogTable;
 	TransactionId latestRemovedXid;
 	uint16		nredirected;
 	uint16		ndead;
@@ -252,6 +253,7 @@ typedef struct xl_heap_clean
  */
 typedef struct xl_heap_cleanup_info
 {
+	bool		onCatalogTable;
 	RelFileNode node;
 	TransactionId latestRemovedXid;
 } xl_heap_cleanup_info;
@@ -332,6 +334,7 @@ typedef struct xl_heap_freeze_tuple
  */
 typedef struct xl_heap_freeze_page
 {
+	bool		onCatalogTable;
 	TransactionId cutoff_xid;
 	uint16		ntuples;
 } xl_heap_freeze_page;
@@ -346,6 +349,7 @@ typedef struct xl_heap_freeze_page
  */
 typedef struct xl_heap_visible
 {
+	bool		onCatalogTable;
 	TransactionId cutoff_xid;
 	uint8		flags;
 } xl_heap_visible;
@@ -395,7 +399,7 @@ extern void heap2_desc(StringInfo buf, XLogReaderState *record);
 extern const char *heap2_identify(uint8 info);
 extern void heap_xlog_logical_rewrite(XLogReaderState *r);
 
-extern XLogRecPtr log_heap_cleanup_info(RelFileNode rnode,
+extern XLogRecPtr log_heap_cleanup_info(Relation rel,
 					  TransactionId latestRemovedXid);
 extern XLogRecPtr log_heap_clean(Relation reln, Buffer buffer,
 			   OffsetNumber *redirected, int nredirected,
@@ -414,7 +418,7 @@ extern bool heap_prepare_freeze_tuple(HeapTupleHeader tuple,
 						  bool *totally_frozen);
 extern void heap_execute_freeze_tuple(HeapTupleHeader tuple,
 						  xl_heap_freeze_tuple *xlrec_tp);
-extern XLogRecPtr log_heap_visible(RelFileNode rnode, Buffer heap_buffer,
+extern XLogRecPtr log_heap_visible(Relation rel, Buffer heap_buffer,
 				 Buffer vm_buffer, TransactionId cutoff_xid, uint8 flags);
 
 #endif							/* HEAPAM_XLOG_H */
diff --git a/src/include/access/nbtxlog.h b/src/include/access/nbtxlog.h
index 9beccc8..f64a33c 100644
--- a/src/include/access/nbtxlog.h
+++ b/src/include/access/nbtxlog.h
@@ -126,6 +126,7 @@ typedef struct xl_btree_split
  */
 typedef struct xl_btree_delete
 {
+	bool		onCatalogTable;
 	TransactionId latestRemovedXid;
 	int			nitems;
 
@@ -139,6 +140,7 @@ typedef struct xl_btree_delete
  */
 typedef struct xl_btree_reuse_page
 {
+	bool		onCatalogTable;
 	RelFileNode node;
 	BlockNumber block;
 	TransactionId latestRemovedXid;
diff --git a/src/include/access/spgxlog.h b/src/include/access/spgxlog.h
index ee8fc6f..d535441 100644
--- a/src/include/access/spgxlog.h
+++ b/src/include/access/spgxlog.h
@@ -237,6 +237,7 @@ typedef struct spgxlogVacuumRoot
 
 typedef struct spgxlogVacuumRedirect
 {
+	bool		onCatalogTable;
 	uint16		nToPlaceholder; /* number of redirects to make placeholders */
 	OffsetNumber firstPlaceholder;	/* first placeholder tuple to remove */
 	TransactionId newestRedirectXid;	/* newest XID of removed redirects */
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index a8f1d66..4e0776a 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -205,4 +205,6 @@ extern void CheckPointReplicationSlots(void);
 
 extern void CheckSlotRequirements(void);
 
+extern void ResolveRecoveryConflictWithSlots(Oid dboid, TransactionId xid);
+
 #endif							/* SLOT_H */
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index 2361243..f276c7e 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -28,7 +28,7 @@ extern void InitRecoveryTransactionEnvironment(void);
 extern void ShutdownRecoveryTransactionEnvironment(void);
 
 extern void ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid,
-									RelFileNode node);
+									bool catalogTable, RelFileNode node);
 extern void ResolveRecoveryConflictWithTablespace(Oid tsid);
 extern void ResolveRecoveryConflictWithDatabase(Oid dbid);
 
diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h
index 9606d02..78bc639 100644
--- a/src/include/utils/lsyscache.h
+++ b/src/include/utils/lsyscache.h
@@ -131,6 +131,7 @@ extern char get_rel_relkind(Oid relid);
 extern bool get_rel_relispartition(Oid relid);
 extern Oid	get_rel_tablespace(Oid relid);
 extern char get_rel_persistence(Oid relid);
+extern bool get_rel_logical_catalog(Oid relid);
 extern Oid	get_transform_fromsql(Oid typid, Oid langid, List *trftypes);
 extern Oid	get_transform_tosql(Oid typid, Oid langid, List *trftypes);
 extern bool get_typisdefined(Oid typid);
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index 89a7fbf..c36e228 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -16,6 +16,7 @@
 
 #include "access/tupdesc.h"
 #include "access/xlog.h"
+#include "catalog/catalog.h"
 #include "catalog/pg_class.h"
 #include "catalog/pg_index.h"
 #include "catalog/pg_publication.h"
diff --git a/src/test/recovery/t/016_logical_decoding_on_replica.pl b/src/test/recovery/t/016_logical_decoding_on_replica.pl
new file mode 100644
index 0000000..7998d85
--- /dev/null
+++ b/src/test/recovery/t/016_logical_decoding_on_replica.pl
@@ -0,0 +1,386 @@
+# Demonstrate that logical can follow timeline switches.
+#
+# Test logical decoding on a standby.
+#
+use strict;
+use warnings;
+use 5.8.0;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 55;
+use RecursiveCopy;
+use File::Copy;
+
+my ($stdin, $stdout, $stderr, $ret, $handle, $return);
+my $backup_name;
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1, has_archiving => 1);
+$node_master->append_conf('postgresql.conf', q{
+wal_level = 'logical'
+max_replication_slots = 4
+max_wal_senders = 4
+log_min_messages = 'debug2'
+log_error_verbosity = verbose
+# send status rapidly so we promptly advance xmin on master
+wal_receiver_status_interval = 1
+# very promptly terminate conflicting backends
+max_standby_streaming_delay = '2s'
+});
+$node_master->dump_info;
+$node_master->start;
+
+$node_master->psql('postgres', q[CREATE DATABASE testdb]);
+
+$node_master->safe_psql('testdb', q[SELECT * FROM pg_create_physical_replication_slot('decoding_standby');]);
+$backup_name = 'b1';
+my $backup_dir = $node_master->backup_dir . "/" . $backup_name;
+TestLib::system_or_bail('pg_basebackup', '-D', $backup_dir, '-d', $node_master->connstr('testdb'), '--slot=decoding_standby');
+
+sub print_phys_xmin
+{
+	my $slot = $node_master->slot('decoding_standby');
+	return ($slot->{'xmin'}, $slot->{'catalog_xmin'});
+}
+
+my ($xmin, $catalog_xmin) = print_phys_xmin();
+# After slot creation, xmins must be null
+is($xmin, '', "xmin null");
+is($catalog_xmin, '', "catalog_xmin null");
+
+my $node_replica = get_new_node('replica');
+$node_replica->init_from_backup(
+	$node_master, $backup_name,
+	has_streaming => 1,
+	has_restoring => 1);
+$node_replica->append_conf('postgresql.conf',
+	q[primary_slot_name = 'decoding_standby']);
+
+$node_replica->start;
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+
+# with hot_standby_feedback off, xmin and catalog_xmin must still be null
+($xmin, $catalog_xmin) = print_phys_xmin();
+is($xmin, '', "xmin null after replica join");
+is($catalog_xmin, '', "catalog_xmin null after replica join");
+
+$node_replica->append_conf('postgresql.conf',q[
+hot_standby_feedback = on
+]);
+$node_replica->restart;
+sleep(2); # ensure walreceiver feedback sent
+
+# If no slot on standby exists to hold down catalog_xmin it must follow xmin,
+# (which is nextXid when no xacts are running on the standby).
+($xmin, $catalog_xmin) = print_phys_xmin();
+ok($xmin, "xmin not null");
+is($xmin, $catalog_xmin, "xmin and catalog_xmin equal");
+
+# Create new slots on the replica, ignoring the ones on the master completely.
+#
+# This must succeed since we know we have a catalog_xmin reservation. We
+# might've already sent hot standby feedback to advance our physical slot's
+# catalog_xmin but not received the corresponding xlog for the catalog xmin
+# advance, in which case we'll create a slot that isn't usable. The calling
+# application can prevent this by creating a temporary slot on the master to
+# lock in its catalog_xmin. For a truly race-free solution we'd need
+# master-to-standby hot_standby_feedback replies.
+#
+# In this case it won't race because there's no concurrent activity on the
+# master.
+#
+is($node_replica->psql('testdb', qq[SELECT * FROM pg_create_logical_replication_slot('standby_logical', 'test_decoding')]),
+   0, 'logical slot creation on standby succeeded')
+	or BAIL_OUT('cannot continue if slot creation fails, see logs');
+
+sub print_logical_xmin
+{
+	my $slot = $node_replica->slot('standby_logical');
+	return ($slot->{'xmin'}, $slot->{'catalog_xmin'});
+}
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+sleep(2); # ensure walreceiver feedback sent
+
+($xmin, $catalog_xmin) = print_phys_xmin();
+isnt($xmin, '', "physical xmin not null");
+isnt($catalog_xmin, '', "physical catalog_xmin not null");
+
+($xmin, $catalog_xmin) = print_logical_xmin();
+is($xmin, '', "logical xmin null");
+isnt($catalog_xmin, '', "logical catalog_xmin not null");
+
+$node_master->safe_psql('testdb', 'CREATE TABLE test_table(id serial primary key, blah text)');
+$node_master->safe_psql('testdb', q[INSERT INTO test_table(blah) values ('itworks')]);
+$node_master->safe_psql('testdb', 'DROP TABLE test_table');
+$node_master->safe_psql('testdb', 'VACUUM');
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+sleep(2); # ensure walreceiver feedback sent
+
+($xmin, $catalog_xmin) = print_phys_xmin();
+isnt($xmin, '', "physical xmin not null");
+isnt($catalog_xmin, '', "physical catalog_xmin not null");
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+sleep(2); # ensure walreceiver feedback sent
+
+# Should show the inserts even when the table is dropped on master
+($ret, $stdout, $stderr) = $node_replica->psql('testdb', qq[SELECT data FROM pg_logical_slot_get_changes('standby_logical', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]);
+is($stderr, '', 'stderr is empty');
+is($ret, 0, 'replay from slot succeeded')
+	or BAIL_OUT('cannot continue if slot replay fails');
+is($stdout, q{BEGIN
+table public.test_table: INSERT: id[integer]:1 blah[text]:'itworks'
+COMMIT}, 'replay results match');
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+sleep(2); # ensure walreceiver feedback sent
+
+my ($physical_xmin, $physical_catalog_xmin) = print_phys_xmin();
+isnt($physical_xmin, '', "physical xmin not null");
+isnt($physical_catalog_xmin, '', "physical catalog_xmin not null");
+
+my ($logical_xmin, $logical_catalog_xmin) = print_logical_xmin();
+is($logical_xmin, '', "logical xmin null");
+isnt($logical_catalog_xmin, '', "logical catalog_xmin not null");
+
+# Ok, do a pile of tx's and make sure xmin advances.
+# Ideally we'd just hold catalog_xmin, but since hs_feedback currently uses the slot,
+# we hold down xmin.
+$node_master->safe_psql('testdb', qq[CREATE TABLE catalog_increase_1();]);
+$node_master->safe_psql('testdb', 'CREATE TABLE test_table(id serial primary key, blah text)');
+for my $i (0 .. 2000)
+{
+    $node_master->safe_psql('testdb', qq[INSERT INTO test_table(blah) VALUES ('entry $i')]);
+}
+$node_master->safe_psql('testdb', qq[CREATE TABLE catalog_increase_2();]);
+$node_master->safe_psql('testdb', 'VACUUM');
+
+my ($new_logical_xmin, $new_logical_catalog_xmin) = print_logical_xmin();
+cmp_ok($new_logical_catalog_xmin, "==", $logical_catalog_xmin, "logical slot catalog_xmin hasn't advanced before get_changes");
+
+($ret, $stdout, $stderr) = $node_replica->psql('testdb', qq[SELECT data FROM pg_logical_slot_get_changes('standby_logical', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]);
+is($ret, 0, 'replay of big series succeeded');
+isnt($stdout, '', 'replayed some rows');
+
+($new_logical_xmin, $new_logical_catalog_xmin) = print_logical_xmin();
+is($new_logical_xmin, '', "logical xmin null");
+isnt($new_logical_catalog_xmin, '', "logical slot catalog_xmin not null");
+cmp_ok($new_logical_catalog_xmin, ">", $logical_catalog_xmin, "logical slot catalog_xmin advanced after get_changes");
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+sleep(2); # ensure walreceiver feedback sent
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+sleep(2); # ensure walreceiver feedback sent
+
+my ($new_physical_xmin, $new_physical_catalog_xmin) = print_phys_xmin();
+isnt($new_physical_xmin, '', "physical xmin not null");
+# hot standby feedback should advance phys catalog_xmin now the standby's slot
+# doesn't hold it down as far.
+isnt($new_physical_catalog_xmin, '', "physical catalog_xmin not null");
+cmp_ok($new_physical_catalog_xmin, ">", $physical_catalog_xmin, "physical catalog_xmin advanced");
+
+cmp_ok($new_physical_catalog_xmin, "<=", $new_logical_catalog_xmin, 'upstream physical slot catalog_xmin not past downstream catalog_xmin with hs_feedback on');
+
+#########################################################
+# Upstream catalog retention
+#########################################################
+
+sub test_catalog_xmin_retention()
+{
+	# First burn some xids on the master in another DB, so we push the master's
+	# nextXid ahead.
+	foreach my $i (1 .. 100)
+	{
+		$node_master->safe_psql('postgres', 'SELECT txid_current()');
+	}
+
+	# Force vacuum freeze on the master and ensure its oldestXmin doesn't advance
+	# past our needed xmin. The only way we have visibility into that is to force
+	# a checkpoint.
+	$node_master->safe_psql('postgres', "UPDATE pg_database SET datallowconn = true WHERE datname = 'template0'");
+	foreach my $dbname ('template1', 'postgres', 'testdb', 'template0')
+	{
+		$node_master->safe_psql($dbname, 'VACUUM FREEZE');
+	}
+	sleep(1);
+	$node_master->safe_psql('postgres', 'CHECKPOINT');
+	IPC::Run::run(['pg_controldata', $node_master->data_dir()], '>', \$stdout)
+		or die "pg_controldata failed with $?";
+	my @checkpoint = split('\n', $stdout);
+	my ($oldestXid, $oldestCatalogXmin, $nextXid) = ('', '', '');
+	foreach my $line (@checkpoint)
+	{
+		if ($line =~ qr/^Latest checkpoint's NextXID:\s+\d+:(\d+)/)
+		{
+			$nextXid = $1;
+		}
+		if ($line =~ qr/^Latest checkpoint's oldestXID:\s+(\d+)/)
+		{
+			$oldestXid = $1;
+		}
+		if ($line =~ qr/^Latest checkpoint's oldestCatalogXmin:\s*(\d+)/)
+		{
+			$oldestCatalogXmin = $1;
+		}
+	}
+	die 'no oldestXID found in checkpoint' unless $oldestXid;
+
+	my ($new_physical_xmin, $new_physical_catalog_xmin) = print_phys_xmin();
+	my ($new_logical_xmin, $new_logical_catalog_xmin) = print_logical_xmin();
+
+	print "upstream oldestXid $oldestXid, oldestCatalogXmin $oldestCatalogXmin, nextXid $nextXid, phys slot catalog_xmin $new_physical_catalog_xmin, downstream catalog_xmin $new_logical_catalog_xmin";
+
+	$node_master->safe_psql('postgres', "UPDATE pg_database SET datallowconn = false WHERE datname = 'template0'");
+
+	return ($oldestXid, $oldestCatalogXmin);
+}
+
+my ($oldestXid, $oldestCatalogXmin) = test_catalog_xmin_retention();
+
+cmp_ok($oldestXid, "<=", $new_logical_catalog_xmin, 'upstream oldestXid not past downstream catalog_xmin with hs_feedback on');
+
+########################################################################
+# Recovery conflict: conflicting replication slot should get dropped
+########################################################################
+#
+#
+# One way to reproduce recovery conflict is to run VACUUM FULL with
+# hot_standby_feedback turned off on slave.
+$node_replica->append_conf('postgresql.conf',q[
+hot_standby_feedback = off
+]);
+$node_replica->restart;
+sleep(2); # ensure walreceiver feedback sent
+$node_master->safe_psql('testdb', 'VACUUM FULL');
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+sleep(2); # ensure walreceiver feedback sent
+
+($ret, $stdout, $stderr) = $node_replica->psql('testdb', qq[SELECT data FROM pg_logical_slot_get_changes('standby_logical', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]);
+isnt($ret, 0, 'usage of slot failed as expected');
+like($stderr, qr/does not exist/, 'slot not found as expected');
+
+# Re-create the slot now that we know it is dropped
+is($node_replica->psql('testdb', qq[SELECT * FROM pg_create_logical_replication_slot('standby_logical', 'test_decoding')]),
+   0, 'logical slot creation on standby succeeded')
+	or BAIL_OUT('cannot continue if slot creation fails, see logs');
+
+# Set hot_standby_feedback back on
+$node_replica->append_conf('postgresql.conf',q[
+hot_standby_feedback = on
+]);
+$node_replica->restart;
+sleep(2); # ensure walreceiver feedback sent
+
+##################################################
+# Drop slot
+##################################################
+#
+is($node_replica->safe_psql('postgres', 'SHOW hot_standby_feedback'), 'on', 'hs_feedback is on');
+
+($xmin, $catalog_xmin) = print_phys_xmin();
+
+# Make sure slots on replicas are droppable, and properly clear the upstream's xmin
+$node_replica->psql('testdb', q[SELECT pg_drop_replication_slot('standby_logical')]);
+
+is($node_replica->slot('standby_logical')->{'slot_type'}, '', 'slot on standby dropped manually');
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+sleep(2); # ensure walreceiver feedback sent
+
+my ($new_xmin, $new_catalog_xmin) = print_phys_xmin();
+# We're now back to the old behaviour of hot_standby_feedback
+# reporting nextXid for both thresholds
+ok($new_catalog_xmin, "physical catalog_xmin still non-null");
+cmp_ok($new_catalog_xmin, '==', $new_xmin,
+	'xmin and catalog_xmin equal after slot drop');
+
+
+##################################################
+# Recovery: drop database drops idle slots
+##################################################
+
+# Create a couple of slots on the DB to ensure they are dropped when we drop
+# the DB on the upstream if they're on the right DB, or not dropped if on
+# another DB.
+
+$node_replica->command_ok(['pg_recvlogical', '-d', $node_replica->connstr('testdb'), '-P', 'test_decoding', '-S', 'dodropslot', '--create-slot'], 'pg_recvlogical created dodropslot');
+#	or BAIL_OUT('slot creation failed, subsequent results would be meaningless');
+# TODO : Above, it bails out even when pg_recvlogical is successful, commented out BAIL_OUT
+$node_replica->command_ok(['pg_recvlogical', '-v', '-d', $node_replica->connstr('postgres'), '-P', 'test_decoding', '-S', 'otherslot', '--create-slot'], 'pg_recvlogical created otherslot');
+#	or BAIL_OUT('slot creation failed, subsequent results would be meaningless');
+# TODO : Above, it bails out even when pg_recvlogical is successful, commented out BAIL_OUT
+
+is($node_replica->slot('dodropslot')->{'slot_type'}, 'logical', 'slot dodropslot on standby created');
+is($node_replica->slot('otherslot')->{'slot_type'}, 'logical', 'slot otherslot on standby created');
+
+# dropdb on the master to verify slots are dropped on standby
+$node_master->safe_psql('postgres', q[DROP DATABASE testdb]);
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+
+is($node_replica->safe_psql('postgres', q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb')]), 'f',
+  'database dropped on standby');
+
+is($node_replica->slot('dodropslot2')->{'slot_type'}, '', 'slot on standby dropped');
+is($node_replica->slot('otherslot')->{'slot_type'}, 'logical', 'otherslot on standby not dropped');
+
+
+##################################################
+# Recovery: drop database drops in-use slots
+##################################################
+
+# This time, have the slot in-use on the downstream DB when we drop it.
+print "Testing dropdb when downstream slot is in-use";
+$node_master->psql('postgres', q[CREATE DATABASE testdb2]);
+
+print "creating slot dodropslot2";
+$node_replica->command_ok(['pg_recvlogical', '-d', $node_replica->connstr('testdb2'), '-P', 'test_decoding', '-S', 'dodropslot2', '--create-slot'],
+	'pg_recvlogical created slot test_decoding');
+is($node_replica->slot('dodropslot2')->{'slot_type'}, 'logical', 'slot dodropslot2 on standby created');
+
+# make sure the slot is in use
+print "starting pg_recvlogical";
+$handle = IPC::Run::start(['pg_recvlogical', '-d', $node_replica->connstr('testdb2'), '-S', 'dodropslot2', '-f', '-', '--no-loop', '--start'], '>', \$stdout, '2>', \$stderr);
+sleep(1);
+
+is($node_replica->slot('dodropslot2')->{'active'}, 't', 'slot on standby is active')
+  or BAIL_OUT("slot not active on standby, cannot continue. pg_recvlogical exited with '$stdout', '$stderr'");
+
+# Master doesn't know the replica's slot is busy so dropdb should succeed
+$node_master->safe_psql('postgres', q[DROP DATABASE testdb2]);
+ok(1, 'dropdb finished');
+
+while ($node_replica->slot('dodropslot2')->{'active_pid'})
+{
+	sleep(1);
+	print "waiting for walsender to exit";
+}
+
+print "walsender exited, waiting for pg_recvlogical to exit";
+
+# our client should've terminated in response to the walsender error
+eval {
+	$handle->finish;
+};
+$return = $?;
+if ($return) {
+	is($return, 256, "pg_recvlogical terminated by server");
+	like($stderr, qr/terminating connection due to conflict with recovery/, 'recvlogical recovery conflict');
+	like($stderr, qr/User was connected to a database that must be dropped./, 'recvlogical recovery conflict db');
+}
+
+is($node_replica->slot('dodropslot2')->{'active_pid'}, '', 'walsender backend exited');
+
+# The slot should be dropped by recovery now
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+
+is($node_replica->safe_psql('postgres', q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb2')]), 'f',
+  'database dropped on standby');
+
+is($node_replica->slot('dodropslot2')->{'slot_type'}, '', 'slot on standby dropped');
