Hi,

On 2023-04-07 08:09:50 +0200, Drouvot, Bertrand wrote:
> Hi,
>
> On 4/7/23 7:56 AM, Andres Freund wrote:
> > Hi,
> >
> > On 2023-04-07 07:02:04 +0200, Drouvot, Bertrand wrote:
> > > Done in V63 attached and did change the associated comment a bit.
> >
> > Can you send your changes incrementally, relative to V62? I'm polishing them
> > right now, and that'd make it a lot easier to apply your changes ontop.
> >
>
> Sure, please find them enclosed.

Thanks.


Here's my current working state - I'll go to bed soon.

Changes:

- shared catalog relations weren't handled correctly, because the dboid is
  InvalidOid for them. I wrote a test for that as well.

- ReplicationSlotsComputeRequiredXmin() took invalidated logical slots into
  account (ReplicationSlotsComputeLogicalRestartLSN() too, but it never looks
  at logical slots)

- I don't think the subset of slot xids that were checked when invalidating
  was right. We need to check effective_xmin and effective_catalog_xmin - the
  latter was using catalog_xmin.

- similarly, it wasn't right that specifically those two fields were
  overwritten when invalidated - as that was done, I suspect the changes might
  get lost on a restart...

- As mentioned previously, I did not like all the functions in slot.h, nor
  their naming. Not yet quite finished with that, but a good bit further

- There were a lot of unrelated changes, e.g. removing comments like
 * NB - this runs as part of checkpoint, so avoid raising errors if possible.

- I still don't like the order of the patches, fixing the walsender patches
  after introducing support for logical decoding on standby. Reordered.

- I don't think logical slots being invalidated as checked e.g. in
  pg_logical_replication_slot_advance()

- I didn't like much that InvalidatePossiblyObsoleteSlot() switched between
  kill() and SendProcSignal() based on the "conflict". There very well could
  be reasons to use InvalidatePossiblyObsoleteSlot() with an xid from outside
  of the startup process in the future. Instead I made it differentiate based
  on MyBackendType == B_STARTUP.


I also:

Added new patch that replaces invalidated_at with a new enum, 'invalidated',
listing the reason for the invalidation. I added a check for !invalidated to
ReplicationSlotsComputeRequiredLSN() etc.

Added new patch moving checks for invalid logical slots into
CreateDecodingContext(). Otherwise we end up with 5 or so checks, which makes
no sense. As far as I can tell the old message in
pg_logical_slot_get_changes_guts() was bogus, one couldn't get there having
"never previously reserved WAL"

Split "Handle logical slot conflicts on standby." into two. I'm not sure that
should stay that way, but it made it easier to hack on
InvalidateObsoleteReplicationSlots.


Todo:
- write a test that invalidated logical slots stay invalidated across a restart
- write a test that invalidated logical slots do not lead to retaining WAL
- Further evolve the API of InvalidateObsoleteReplicationSlots()
  - pass in the ReplicationSlotInvalidationCause we're trying to conflict on?
  - rename xid to snapshotConflictHorizon, that'd be more in line with the
    ResolveRecoveryConflictWithSnapshot and easier to understand, I think

- The test could stand a bit of cleanup and consolidation
  - No need to start 4 psql processes to do 4 updates, just do it in one
    safe_psql()
  - the sequence of drop_logical_slots(), create_logical_slots(),
    change_hot_standby_feedback_and_wait_for_xmins(), make_slot_active() is
    repeated quite a few times
  - the stats queries checking for specific conflict counts, including
    preceding tests, is pretty painful. I suggest to reset the stats at the
    end of the test instead (likely also do the drop_logical_slot() there).
  - it's hard to correlate postgres log and the tap test, because the slots
    are named the same across all tests. Perhaps they could have a per-test
    prefix?
  - numbering tests is a PITA, I had to renumber the later ones, when adding a
    test for shared catalog tables


My attached version does include your v62-63 incremental chnages.

Greetings,

Andres Freund
>From 1e5461e0019678a92192b0dd5d9bf3f7105f504d Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Thu, 6 Apr 2023 20:00:07 -0700
Subject: [PATCH va65 1/9] replication slots: replace invalidated_at LSN with
 an enum

---
 src/include/replication/slot.h      | 15 +++++++++++++--
 src/backend/replication/slot.c      | 21 ++++++++++++++++++---
 src/backend/replication/slotfuncs.c |  8 +++-----
 3 files changed, 34 insertions(+), 10 deletions(-)

diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 8872c80cdfe..793f0701b88 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -37,6 +37,17 @@ typedef enum ReplicationSlotPersistency
 	RS_TEMPORARY
 } ReplicationSlotPersistency;
 
+/*
+ * Slots can be invalidated, e.g. due to max_slot_wal_keep_size. If so, the
+ * 'invalidated' field is set to a value other than _NONE.
+ */
+typedef enum ReplicationSlotInvalidationCause
+{
+	RS_INVAL_NONE,
+	/* required WAL has been removed */
+	RS_INVAL_WAL,
+} ReplicationSlotInvalidationCause;
+
 /*
  * On-Disk data of a replication slot, preserved across restarts.
  */
@@ -72,8 +83,8 @@ typedef struct ReplicationSlotPersistentData
 	/* oldest LSN that might be required by this replication slot */
 	XLogRecPtr	restart_lsn;
 
-	/* restart_lsn is copied here when the slot is invalidated */
-	XLogRecPtr	invalidated_at;
+	/* RS_INVAL_NONE if valid, or the reason for having been invalidated */
+	ReplicationSlotInvalidationCause	invalidated;
 
 	/*
 	 * Oldest LSN that the client has acked receipt for.  This is used as the
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 2293c0c6fc3..06ff3559dd1 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -855,8 +855,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
 		SpinLockAcquire(&s->mutex);
 		effective_xmin = s->effective_xmin;
 		effective_catalog_xmin = s->effective_catalog_xmin;
-		invalidated = (!XLogRecPtrIsInvalid(s->data.invalidated_at) &&
-					   XLogRecPtrIsInvalid(s->data.restart_lsn));
+		invalidated = s->data.invalidated != RS_INVAL_NONE;
 		SpinLockRelease(&s->mutex);
 
 		/* invalidated slots need not apply */
@@ -901,14 +900,20 @@ ReplicationSlotsComputeRequiredLSN(void)
 	{
 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
 		XLogRecPtr	restart_lsn;
+		bool		invalidated;
 
 		if (!s->in_use)
 			continue;
 
 		SpinLockAcquire(&s->mutex);
 		restart_lsn = s->data.restart_lsn;
+		invalidated = s->data.invalidated != RS_INVAL_NONE;
 		SpinLockRelease(&s->mutex);
 
+		/* invalidated slots need not apply */
+		if (invalidated)
+			continue;
+
 		if (restart_lsn != InvalidXLogRecPtr &&
 			(min_required == InvalidXLogRecPtr ||
 			 restart_lsn < min_required))
@@ -946,6 +951,7 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
 	{
 		ReplicationSlot *s;
 		XLogRecPtr	restart_lsn;
+		bool		invalidated;
 
 		s = &ReplicationSlotCtl->replication_slots[i];
 
@@ -960,8 +966,13 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
 		/* read once, it's ok if it increases while we're checking */
 		SpinLockAcquire(&s->mutex);
 		restart_lsn = s->data.restart_lsn;
+		invalidated = s->data.invalidated != RS_INVAL_NONE;
 		SpinLockRelease(&s->mutex);
 
+		/* invalidated slots need not apply */
+		if (invalidated)
+			continue;
+
 		if (restart_lsn == InvalidXLogRecPtr)
 			continue;
 
@@ -1012,6 +1023,8 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
 		if (s->data.database != dboid)
 			continue;
 
+		/* NB: intentionally counting invalidated slots */
+
 		/* count slots with spinlock held */
 		SpinLockAcquire(&s->mutex);
 		(*nslots)++;
@@ -1069,6 +1082,8 @@ restart:
 		if (s->data.database != dboid)
 			continue;
 
+		/* NB: intentionally including invalidated slots */
+
 		/* acquire slot, so ReplicationSlotDropAcquired can be reused  */
 		SpinLockAcquire(&s->mutex);
 		/* can't change while ReplicationSlotControlLock is held */
@@ -1294,7 +1309,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
 		{
 			MyReplicationSlot = s;
 			s->active_pid = MyProcPid;
-			s->data.invalidated_at = restart_lsn;
+			s->data.invalidated = RS_INVAL_WAL;
 			s->data.restart_lsn = InvalidXLogRecPtr;
 
 			/* Let caller know */
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 2f3c9648241..ad3e72be5ee 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -315,12 +315,10 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 			nulls[i++] = true;
 
 		/*
-		 * If invalidated_at is valid and restart_lsn is invalid, we know for
-		 * certain that the slot has been invalidated.  Otherwise, test
-		 * availability from restart_lsn.
+		 * If the slot has not been invalidated, test availability from
+		 * restart_lsn.
 		 */
-		if (XLogRecPtrIsInvalid(slot_contents.data.restart_lsn) &&
-			!XLogRecPtrIsInvalid(slot_contents.data.invalidated_at))
+		if (slot_contents.data.invalidated != RS_INVAL_NONE)
 			walstate = WALAVAIL_REMOVED;
 		else
 			walstate = GetWALAvailability(slot_contents.data.restart_lsn);
-- 
2.38.0

>From 4ae6d960b209d88120558d65379abb739d53f22d Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Thu, 6 Apr 2023 19:51:06 -0700
Subject: [PATCH va65 2/9] Prevent use of invalidated logical slot in
 CreateDecodingContext()

Previously we had checks for this in multiple places. Support for logical
decoding on standbys will add further checks, making it worth while to change
this.

TODO: Ensure the error message differences across walsender / SQL interface
are the right thing.  As far as I can tell, the "This slot has never
previously reserved WAL" portion in pg_logical_slot_get_changes_guts() was
bogus.

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/backend/replication/logical/logical.c      | 16 ++++++++++++++++
 src/backend/replication/logical/logicalfuncs.c | 13 -------------
 src/backend/replication/walsender.c            |  7 -------
 3 files changed, 16 insertions(+), 20 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index c3ec97a0a62..85fc49f655d 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -518,6 +518,22 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 				 errmsg("replication slot \"%s\" was not created in this database",
 						NameStr(slot->data.name))));
 
+	/*
+	 * Check if slot has been invalidated due to max_slot_wal_keep_size. Avoid
+	 * "cannot get changes" wording in this errmsg because that'd be
+	 * confusingly ambiguous about no changes being available when called from
+	 * pg_logical_slot_get_changes_guts().
+	 */
+	if (MyReplicationSlot->data.invalidated == RS_INVAL_WAL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("can no longer get changes from replication slot \"%s\"",
+						NameStr(MyReplicationSlot->data.name)),
+				 errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
+
+	Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE);
+	Assert(MyReplicationSlot->data.restart_lsn != InvalidXLogRecPtr);
+
 	if (start_lsn == InvalidXLogRecPtr)
 	{
 		/* continue from last position */
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index fa1b641a2b0..55a24c02c94 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -214,19 +214,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 									LogicalOutputPrepareWrite,
 									LogicalOutputWrite, NULL);
 
-		/*
-		 * After the sanity checks in CreateDecodingContext, make sure the
-		 * restart_lsn is valid.  Avoid "cannot get changes" wording in this
-		 * errmsg because that'd be confusingly ambiguous about no changes
-		 * being available.
-		 */
-		if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
-			ereport(ERROR,
-					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-					 errmsg("can no longer get changes from replication slot \"%s\"",
-							NameStr(*name)),
-					 errdetail("This slot has never previously reserved WAL, or it has been invalidated.")));
-
 		MemoryContextSwitchTo(oldcontext);
 
 		/*
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 75e8363e248..e40a9b1ba7b 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1253,13 +1253,6 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 
 	ReplicationSlotAcquire(cmd->slotname, true);
 
-	if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
-		ereport(ERROR,
-				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-				 errmsg("cannot read from logical replication slot \"%s\"",
-						cmd->slotname),
-				 errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
-
 	/*
 	 * Force a disconnect, so that the decoding code doesn't need to care
 	 * about an eventual switch from running in recovery, to running in a
-- 
2.38.0

>From cc1370fb4be75ce6619d9f29649a81822a16d977 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Thu, 6 Apr 2023 23:14:39 -0700
Subject: [PATCH va65 3/9] Add support for more causes to
 InvalidateObsoleteReplicationSlots()

This supports invalidating all logical replication slots and invalidating
logical replication slots that conflict with an increased snapshot conflict
horizon.

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/include/replication/slot.h            |   7 +-
 src/include/storage/procsignal.h          |   1 +
 src/backend/access/transam/xlog.c         |   6 +-
 src/backend/replication/logical/logical.c |   7 +
 src/backend/replication/slot.c            | 161 ++++++++++++++++++----
 src/backend/storage/ipc/procsignal.c      |   3 +
 src/backend/storage/ipc/standby.c         |   3 +
 src/backend/tcop/postgres.c               |   9 ++
 8 files changed, 167 insertions(+), 30 deletions(-)

diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 793f0701b88..35cafe94bc5 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -46,6 +46,10 @@ typedef enum ReplicationSlotInvalidationCause
 	RS_INVAL_NONE,
 	/* required WAL has been removed */
 	RS_INVAL_WAL,
+	/* required rows have been removed */
+	RS_INVAL_XID,
+	/* wal_level insufficient for slot */
+	RS_INVAL_WAL_LEVEL,
 } ReplicationSlotInvalidationCause;
 
 /*
@@ -226,7 +230,8 @@ extern void ReplicationSlotsComputeRequiredLSN(void);
 extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
-extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
+extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno, Oid dboid,
+											   TransactionId xid);
 extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
 extern int	ReplicationSlotIndex(ReplicationSlot *slot);
 extern bool ReplicationSlotName(int index, Name name);
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index 905af2231ba..2f52100b009 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -42,6 +42,7 @@ typedef enum
 	PROCSIG_RECOVERY_CONFLICT_TABLESPACE,
 	PROCSIG_RECOVERY_CONFLICT_LOCK,
 	PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
+	PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
 	PROCSIG_RECOVERY_CONFLICT_BUFFERPIN,
 	PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 46821ad6056..5e964e2e96b 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -6806,7 +6806,8 @@ CreateCheckPoint(int flags)
 	 */
 	XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
 	KeepLogSeg(recptr, &_logSegNo);
-	if (InvalidateObsoleteReplicationSlots(_logSegNo))
+	if (InvalidateObsoleteReplicationSlots(_logSegNo, InvalidOid,
+										   InvalidTransactionId))
 	{
 		/*
 		 * Some slots have been invalidated; recalculate the old-segment
@@ -7250,7 +7251,8 @@ CreateRestartPoint(int flags)
 	replayPtr = GetXLogReplayRecPtr(&replayTLI);
 	endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
 	KeepLogSeg(endptr, &_logSegNo);
-	if (InvalidateObsoleteReplicationSlots(_logSegNo))
+	if (InvalidateObsoleteReplicationSlots(_logSegNo, InvalidOid,
+										   InvalidTransactionId))
 	{
 		/*
 		 * Some slots have been invalidated; recalculate the old-segment
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 85fc49f655d..27addd58f66 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -531,6 +531,13 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 						NameStr(MyReplicationSlot->data.name)),
 				 errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
 
+	if (MyReplicationSlot->data.invalidated != RS_INVAL_NONE)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("cannot read from logical replication slot \"%s\"",
+						NameStr(MyReplicationSlot->data.name)),
+				 errdetail("This slot has been invalidated because it was conflicting with recovery.")));
+
 	Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE);
 	Assert(MyReplicationSlot->data.restart_lsn != InvalidXLogRecPtr);
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 06ff3559dd1..3e8c1b44e85 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1241,8 +1241,73 @@ ReplicationSlotReserveWal(void)
 }
 
 /*
- * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
- * and mark it invalid, if necessary and possible.
+ * Report that replication slot needs to be invalidated
+ */
+static void
+ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
+					   bool terminating,
+					   int pid,
+					   NameData slotname,
+					   XLogRecPtr restart_lsn,
+					   XLogRecPtr oldestLSN,
+					   TransactionId xid)
+{
+	StringInfoData err_detail;
+	bool		hint = false;
+
+	initStringInfo(&err_detail);
+
+	switch (cause)
+	{
+		case RS_INVAL_WAL:
+			hint = true;
+			appendStringInfo(&err_detail, _("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes."),
+							 LSN_FORMAT_ARGS(restart_lsn),
+							 (unsigned long long) (oldestLSN - restart_lsn));
+			break;
+		case RS_INVAL_XID:
+			appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."), xid);
+			break;
+
+		case RS_INVAL_WAL_LEVEL:
+			appendStringInfo(&err_detail, _("Logical decoding on standby requires wal_level to be at least logical on the primary server"));
+			break;
+		case RS_INVAL_NONE:
+			pg_unreachable();
+	}
+
+	ereport(LOG,
+			terminating ? errmsg("terminating process %d to release replication slot \"%s\"", pid, NameStr(slotname)) :
+			errmsg("invalidating obsolete replication slot \"%s\"", NameStr(slotname)),
+			errdetail_internal("%s", err_detail.data),
+			hint ? errhint("You might need to increase max_slot_wal_keep_size.") : 0);
+
+	pfree(err_detail.data);
+}
+
+/* FIXME: This is a too generic name */
+static inline bool
+TransactionIdIsValidPrecedesOrEquals(TransactionId xid1, TransactionId xid2)
+{
+	return (TransactionIdIsValid(xid1) && TransactionIdPrecedesOrEquals(xid1, xid2));
+}
+
+static inline bool
+LogicalReplicationSlotXidsConflict(ReplicationSlot *s, Oid dboid, TransactionId xid)
+{
+	/* an invalid DB oid signals a shared relation, need to conflict */
+	if (dboid != InvalidOid && dboid != s->data.database)
+		return false;
+
+	return
+		TransactionIdIsValidPrecedesOrEquals(s->effective_xmin, xid) ||
+		TransactionIdIsValidPrecedesOrEquals(s->effective_catalog_xmin, xid);
+}
+
+/*
+ * Helper for InvalidateObsoleteReplicationSlots
+ *
+ * Acquires the given slot and mark it invalid, if necessary and possible.
  *
  * Returns whether ReplicationSlotControlLock was released in the interim (and
  * in that case we're not holding the lock at return, otherwise we are).
@@ -1254,16 +1319,20 @@ ReplicationSlotReserveWal(void)
  */
 static bool
 InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
-							   bool *invalidated)
+							   Oid dboid, TransactionId xid, bool *invalidated)
 {
 	int			last_signaled_pid = 0;
 	bool		released_lock = false;
+	bool		invalidate_all_logical = !TransactionIdIsValid(xid) &&
+		oldestLSN == InvalidXLogRecPtr;
+
 
 	for (;;)
 	{
 		XLogRecPtr	restart_lsn;
 		NameData	slotname;
 		int			active_pid = 0;
+		ReplicationSlotInvalidationCause conflict = RS_INVAL_NONE;
 
 		Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
 
@@ -1286,10 +1355,23 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
 		restart_lsn = s->data.restart_lsn;
 
 		/*
-		 * If the slot is already invalid or is fresh enough, we don't need to
-		 * do anything.
+		 * If the slot is already invalid or is a non conflicting slot, we
+		 * don't need to do anything.
 		 */
-		if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
+		if (s->data.invalidated == RS_INVAL_NONE)
+		{
+			if (oldestLSN != InvalidXLogRecPtr && s->data.restart_lsn != InvalidXLogRecPtr &&
+				s->data.restart_lsn < oldestLSN)
+				conflict = RS_INVAL_WAL;
+			if (TransactionIdIsValid(xid) && SlotIsLogical(s) &&
+				LogicalReplicationSlotXidsConflict(s, dboid, xid))
+				conflict = RS_INVAL_XID;
+			else if (invalidate_all_logical && SlotIsLogical(s))
+				conflict = RS_INVAL_WAL_LEVEL;
+		}
+
+		/* if there's no conflict, we're done */
+		if (conflict == RS_INVAL_NONE)
 		{
 			SpinLockRelease(&s->mutex);
 			if (released_lock)
@@ -1309,8 +1391,9 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
 		{
 			MyReplicationSlot = s;
 			s->active_pid = MyProcPid;
-			s->data.invalidated = RS_INVAL_WAL;
-			s->data.restart_lsn = InvalidXLogRecPtr;
+			s->data.invalidated = conflict;
+			if (conflict == RS_INVAL_WAL)
+				s->data.restart_lsn = InvalidXLogRecPtr;
 
 			/* Let caller know */
 			*invalidated = true;
@@ -1344,15 +1427,17 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
 			 */
 			if (last_signaled_pid != active_pid)
 			{
-				ereport(LOG,
-						errmsg("terminating process %d to release replication slot \"%s\"",
-							   active_pid, NameStr(slotname)),
-						errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
-								  LSN_FORMAT_ARGS(restart_lsn),
-								  (unsigned long long) (oldestLSN - restart_lsn)),
-						errhint("You might need to increase max_slot_wal_keep_size."));
+				ReportSlotInvalidation(conflict, true, active_pid,
+									   slotname, restart_lsn,
+									   oldestLSN, xid);
+
+				if (MyBackendType == B_STARTUP)
+					(void) SendProcSignal(active_pid,
+										  PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
+										  InvalidBackendId);
+				else
+					(void) kill(active_pid, SIGTERM);
 
-				(void) kill(active_pid, SIGTERM);
 				last_signaled_pid = active_pid;
 			}
 
@@ -1385,14 +1470,11 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
 			ReplicationSlotMarkDirty();
 			ReplicationSlotSave();
 			ReplicationSlotRelease();
+			pgstat_drop_replslot(s);
 
-			ereport(LOG,
-					errmsg("invalidating obsolete replication slot \"%s\"",
-						   NameStr(slotname)),
-					errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
-							  LSN_FORMAT_ARGS(restart_lsn),
-							  (unsigned long long) (oldestLSN - restart_lsn)),
-					errhint("You might need to increase max_slot_wal_keep_size."));
+			ReportSlotInvalidation(conflict, false, active_pid,
+								   slotname, restart_lsn,
+								   oldestLSN, xid);
 
 			/* done with this slot for now */
 			break;
@@ -1405,19 +1487,44 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
 }
 
 /*
- * Mark any slot that points to an LSN older than the given segment
- * as invalid; it requires WAL that's about to be removed.
+ * Invalidate slots that require resources about to be removed.
  *
  * Returns true when any slot have got invalidated.
  *
+ * FIXME:
+ *
+ * WAL case (aka check_on_xid is false):
+ *
+ *	 Mark any slot that points to an LSN older than the given segment
+ *	 as invalid; it requires WAL that's about to be removed.
+ *	 invalidated is set to true when any slot have got invalidated.
+ *
+ * Xid case (aka check_on_xid is true):
+ *
+ *	 When xid is valid, it means that we are about to remove rows older than xid.
+ *	 Therefore we need to invalidate slots that depend on seeing those rows.
+ *	 When xid is invalid, invalidate all logical slots. This is required when the
+ *	 master wal_level is set back to replica, so existing logical slots need to
+ *	 be invalidated. Note that WaitExceedsMaxStandbyDelay() is not taken into
+ *	 account here (as opposed to ResolveRecoveryConflictWithVirtualXIDs()): XXXX
+ *
+ *
+ * XXX: Should we have the caller pass in a specific
+ * ReplicationSlotInvalidationCause that we should search for? That'd likely
+ * make some things a bit neater.
+ *
  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
  */
 bool
-InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
+InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno, Oid dboid,
+								   TransactionId xid)
 {
 	XLogRecPtr	oldestLSN;
 	bool		invalidated = false;
 
+	if (max_replication_slots == 0)
+		return invalidated;
+
 	XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
 
 restart:
@@ -1429,7 +1536,7 @@ restart:
 		if (!s->in_use)
 			continue;
 
-		if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated))
+		if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, dboid, xid, &invalidated))
 		{
 			/* if the lock was released, start from scratch */
 			goto restart;
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index 395b2cf6909..c85cb5cc18d 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -673,6 +673,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
 	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT))
 		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT);
 
+	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT))
+		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT);
+
 	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK))
 		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
 
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 9f56b4e95cf..fc81e17901c 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -1478,6 +1478,9 @@ get_recovery_conflict_desc(ProcSignalReason reason)
 		case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
 			reasonDesc = _("recovery conflict on snapshot");
 			break;
+		case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+			reasonDesc = _("recovery conflict on replication slot");
+			break;
 		case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
 			reasonDesc = _("recovery conflict on buffer deadlock");
 			break;
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index a10ecbaf50b..25e0de4e0ff 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -2526,6 +2526,9 @@ errdetail_recovery_conflict(void)
 		case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
 			errdetail("User query might have needed to see row versions that must be removed.");
 			break;
+		case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+			errdetail("User was using the logical slot that must be dropped.");
+			break;
 		case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
 			errdetail("User transaction caused buffer deadlock with recovery.");
 			break;
@@ -3143,6 +3146,12 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
 				InterruptPending = true;
 				break;
 
+			case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+				RecoveryConflictPending = true;
+				QueryCancelPending = true;
+				InterruptPending = true;
+				break;
+
 			default:
 				elog(FATAL, "unrecognized conflict mode: %d",
 					 (int) reason);
-- 
2.38.0

>From a07a6865e6286e82e010ad4ee55e7c50ab372f62 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Thu, 6 Apr 2023 20:03:16 -0700
Subject: [PATCH va65 4/9] Handle logical slot conflicts on standby.
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

During WAL replay on standby, when slot conflict is identified,
invalidate such slots. Also do the same thing if wal_level on the primary server
is reduced to below logical and there are existing logical slots
on standby. Introduce a new ProcSignalReason value for slot
conflict recovery.

Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes Mello,
Bharath Rupireddy, Amit Kapila, Álvaro Herrera
---
 src/include/storage/standby.h       |  2 ++
 src/backend/access/gist/gistxlog.c  |  2 ++
 src/backend/access/hash/hash_xlog.c |  1 +
 src/backend/access/heap/heapam.c    |  3 +++
 src/backend/access/nbtree/nbtxlog.c |  2 ++
 src/backend/access/spgist/spgxlog.c |  1 +
 src/backend/access/transam/xlog.c   | 14 ++++++++++++++
 src/backend/storage/ipc/standby.c   | 11 ++++++++++-
 8 files changed, 35 insertions(+), 1 deletion(-)

diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index 2effdea126f..41f4dc372e6 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -30,8 +30,10 @@ extern void InitRecoveryTransactionEnvironment(void);
 extern void ShutdownRecoveryTransactionEnvironment(void);
 
 extern void ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
+												bool isCatalogRel,
 												RelFileLocator locator);
 extern void ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHorizon,
+													   bool isCatalogRel,
 													   RelFileLocator locator);
 extern void ResolveRecoveryConflictWithTablespace(Oid tsid);
 extern void ResolveRecoveryConflictWithDatabase(Oid dbid);
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
index b7678f3c144..9a86fb3feff 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -197,6 +197,7 @@ gistRedoDeleteRecord(XLogReaderState *record)
 		XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
 
 		ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
+											xldata->isCatalogRel,
 											rlocator);
 	}
 
@@ -390,6 +391,7 @@ gistRedoPageReuse(XLogReaderState *record)
 	 */
 	if (InHotStandby)
 		ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon,
+												   xlrec->isCatalogRel,
 												   xlrec->locator);
 }
 
diff --git a/src/backend/access/hash/hash_xlog.c b/src/backend/access/hash/hash_xlog.c
index f2dd9be8d3f..e8e06c62a95 100644
--- a/src/backend/access/hash/hash_xlog.c
+++ b/src/backend/access/hash/hash_xlog.c
@@ -1003,6 +1003,7 @@ hash_xlog_vacuum_one_page(XLogReaderState *record)
 
 		XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
 		ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
+											xldata->isCatalogRel,
 											rlocator);
 	}
 
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 8b13e3f8925..f389ceee1ea 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -8769,6 +8769,7 @@ heap_xlog_prune(XLogReaderState *record)
 	 */
 	if (InHotStandby)
 		ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+											xlrec->isCatalogRel,
 											rlocator);
 
 	/*
@@ -8940,6 +8941,7 @@ heap_xlog_visible(XLogReaderState *record)
 	 */
 	if (InHotStandby)
 		ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+											xlrec->flags & VISIBILITYMAP_XLOG_CATALOG_REL,
 											rlocator);
 
 	/*
@@ -9061,6 +9063,7 @@ heap_xlog_freeze_page(XLogReaderState *record)
 
 		XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
 		ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+											xlrec->isCatalogRel,
 											rlocator);
 	}
 
diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c
index 414ca4f6deb..c87e46ed66e 100644
--- a/src/backend/access/nbtree/nbtxlog.c
+++ b/src/backend/access/nbtree/nbtxlog.c
@@ -669,6 +669,7 @@ btree_xlog_delete(XLogReaderState *record)
 		XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
 
 		ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+											xlrec->isCatalogRel,
 											rlocator);
 	}
 
@@ -1007,6 +1008,7 @@ btree_xlog_reuse_page(XLogReaderState *record)
 
 	if (InHotStandby)
 		ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon,
+												   xlrec->isCatalogRel,
 												   xlrec->locator);
 }
 
diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c
index b071b59c8ac..459ac929ba5 100644
--- a/src/backend/access/spgist/spgxlog.c
+++ b/src/backend/access/spgist/spgxlog.c
@@ -879,6 +879,7 @@ spgRedoVacuumRedirect(XLogReaderState *record)
 
 		XLogRecGetBlockTag(record, 0, &locator, NULL, NULL);
 		ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
+											xldata->isCatalogRel,
 											locator);
 	}
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 5e964e2e96b..80a7cd8948f 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7965,6 +7965,20 @@ xlog_redo(XLogReaderState *record)
 		/* Update our copy of the parameters in pg_control */
 		memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change));
 
+		/*
+		 * Invalidate logical slots if we are in hot standby and the primary
+		 * does not have a WAL level sufficient for logical decoding. No need
+		 * to search for potentially conflicting logically slots if standby is
+		 * running with wal_level lower than logical, because in that case, we
+		 * would have either disallowed creation of logical slots or
+		 * invalidated existing ones.
+		 */
+		if (InRecovery && InHotStandby &&
+			xlrec.wal_level < WAL_LEVEL_LOGICAL &&
+			wal_level >= WAL_LEVEL_LOGICAL)
+			InvalidateObsoleteReplicationSlots(0, InvalidOid,
+											   InvalidTransactionId);
+
 		LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
 		ControlFile->MaxConnections = xlrec.MaxConnections;
 		ControlFile->max_worker_processes = xlrec.max_worker_processes;
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index fc81e17901c..ce5842b0db6 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -24,6 +24,7 @@
 #include "access/xlogutils.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/slot.h"
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
 #include "storage/proc.h"
@@ -466,6 +467,7 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
  */
 void
 ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
+									bool isCatalogRel,
 									RelFileLocator locator)
 {
 	VirtualTransactionId *backends;
@@ -491,6 +493,10 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
 										   PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
 										   WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT,
 										   true);
+
+	if (wal_level >= WAL_LEVEL_LOGICAL && isCatalogRel)
+		InvalidateObsoleteReplicationSlots(0, locator.dbOid,
+										   snapshotConflictHorizon);
 }
 
 /*
@@ -499,6 +505,7 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
  */
 void
 ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHorizon,
+										   bool isCatalogRel,
 										   RelFileLocator locator)
 {
 	/*
@@ -517,7 +524,9 @@ ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHor
 		TransactionId truncated;
 
 		truncated = XidFromFullTransactionId(snapshotConflictHorizon);
-		ResolveRecoveryConflictWithSnapshot(truncated, locator);
+		ResolveRecoveryConflictWithSnapshot(truncated,
+											isCatalogRel,
+											locator);
 	}
 }
 
-- 
2.38.0

>From ac180d395b710b6c764d3d528e2a9a39e5f42331 Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot...@gmail.com>
Date: Mon, 3 Apr 2023 11:28:30 +0000
Subject: [PATCH va65 5/9] Arrange for a new pg_stat_database_conflicts and
 pg_replication_slots field

As we handled logical slot conflicts on standby on the previous commit, we
can expose the conflict in pg_stat_database_conflicts and pg_replication_slots.

Adding:

- confl_active_logicalslot in pg_stat_database_conflicts
- conflicting in pg_replication_slots

to do so.
---
 src/include/catalog/pg_proc.dat              | 11 ++++++++---
 src/include/pgstat.h                         |  1 +
 src/backend/catalog/system_views.sql         |  6 ++++--
 src/backend/replication/slotfuncs.c          | 12 +++++++++++-
 src/backend/utils/activity/pgstat_database.c |  4 ++++
 src/backend/utils/adt/pgstatfuncs.c          |  3 +++
 doc/src/sgml/monitoring.sgml                 | 11 +++++++++++
 doc/src/sgml/system-views.sgml               | 10 ++++++++++
 src/test/regress/expected/rules.out          |  8 +++++---
 9 files changed, 57 insertions(+), 9 deletions(-)

diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index f9f26422015..bcbae9036d1 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5605,6 +5605,11 @@
   proname => 'pg_stat_get_db_conflict_snapshot', provolatile => 's',
   proparallel => 'r', prorettype => 'int8', proargtypes => 'oid',
   prosrc => 'pg_stat_get_db_conflict_snapshot' },
+{ oid => '9901',
+  descr => 'statistics: recovery conflicts in database caused by logical replication slot',
+  proname => 'pg_stat_get_db_conflict_logicalslot', provolatile => 's',
+  proparallel => 'r', prorettype => 'int8', proargtypes => 'oid',
+  prosrc => 'pg_stat_get_db_conflict_logicalslot' },
 { oid => '3068',
   descr => 'statistics: recovery conflicts in database caused by shared buffer pin',
   proname => 'pg_stat_get_db_conflict_bufferpin', provolatile => 's',
@@ -11071,9 +11076,9 @@
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase}',
+  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index e79b8a34ebc..5e8b04d21b1 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -331,6 +331,7 @@ typedef struct PgStat_StatDBEntry
 	PgStat_Counter conflict_tablespace;
 	PgStat_Counter conflict_lock;
 	PgStat_Counter conflict_snapshot;
+	PgStat_Counter conflict_logicalslot;
 	PgStat_Counter conflict_bufferpin;
 	PgStat_Counter conflict_startup_deadlock;
 	PgStat_Counter temp_files;
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 6b098234f8c..c25067d06de 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -999,7 +999,8 @@ CREATE VIEW pg_replication_slots AS
             L.confirmed_flush_lsn,
             L.wal_status,
             L.safe_wal_size,
-            L.two_phase
+            L.two_phase,
+            L.conflicting
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
@@ -1067,7 +1068,8 @@ CREATE VIEW pg_stat_database_conflicts AS
             pg_stat_get_db_conflict_lock(D.oid) AS confl_lock,
             pg_stat_get_db_conflict_snapshot(D.oid) AS confl_snapshot,
             pg_stat_get_db_conflict_bufferpin(D.oid) AS confl_bufferpin,
-            pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock
+            pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock,
+            pg_stat_get_db_conflict_logicalslot(D.oid) AS confl_active_logicalslot
     FROM pg_database D;
 
 CREATE VIEW pg_stat_user_functions AS
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index ad3e72be5ee..6035cf48160 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -232,7 +232,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 14
+#define PG_GET_REPLICATION_SLOTS_COLS 15
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	XLogRecPtr	currlsn;
 	int			slotno;
@@ -402,6 +402,16 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 
 		values[i++] = BoolGetDatum(slot_contents.data.two_phase);
 
+		if (slot_contents.data.database == InvalidOid)
+			nulls[i++] = true;
+		else
+		{
+			if (slot_contents.data.invalidated != RS_INVAL_NONE)
+				values[i++] = BoolGetDatum(true);
+			else
+				values[i++] = BoolGetDatum(false);
+		}
+
 		Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
 
 		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
diff --git a/src/backend/utils/activity/pgstat_database.c b/src/backend/utils/activity/pgstat_database.c
index 6e650ceaade..7149f22f729 100644
--- a/src/backend/utils/activity/pgstat_database.c
+++ b/src/backend/utils/activity/pgstat_database.c
@@ -109,6 +109,9 @@ pgstat_report_recovery_conflict(int reason)
 		case PROCSIG_RECOVERY_CONFLICT_BUFFERPIN:
 			dbentry->conflict_bufferpin++;
 			break;
+		case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+			dbentry->conflict_logicalslot++;
+			break;
 		case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
 			dbentry->conflict_startup_deadlock++;
 			break;
@@ -387,6 +390,7 @@ pgstat_database_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
 	PGSTAT_ACCUM_DBCOUNT(conflict_tablespace);
 	PGSTAT_ACCUM_DBCOUNT(conflict_lock);
 	PGSTAT_ACCUM_DBCOUNT(conflict_snapshot);
+	PGSTAT_ACCUM_DBCOUNT(conflict_logicalslot);
 	PGSTAT_ACCUM_DBCOUNT(conflict_bufferpin);
 	PGSTAT_ACCUM_DBCOUNT(conflict_startup_deadlock);
 
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index eec9f3cf9b1..4de60d8aa14 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -1066,6 +1066,8 @@ PG_STAT_GET_DBENTRY_INT64(xact_commit)
 /* pg_stat_get_db_xact_rollback */
 PG_STAT_GET_DBENTRY_INT64(xact_rollback)
 
+/* pg_stat_get_db_conflict_logicalslot */
+PG_STAT_GET_DBENTRY_INT64(conflict_logicalslot)
 
 Datum
 pg_stat_get_db_stat_reset_time(PG_FUNCTION_ARGS)
@@ -1099,6 +1101,7 @@ pg_stat_get_db_conflict_all(PG_FUNCTION_ARGS)
 		result = (int64) (dbentry->conflict_tablespace +
 						  dbentry->conflict_lock +
 						  dbentry->conflict_snapshot +
+						  dbentry->conflict_logicalslot +
 						  dbentry->conflict_bufferpin +
 						  dbentry->conflict_startup_deadlock);
 
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index bce9ae46615..fa3b0f810cd 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -4674,6 +4674,17 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
        deadlocks
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_active_logicalslot</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of active logical slots in this database that have been
+       invalidated because they conflict with recovery (note that inactive ones
+       are also invalidated but do not increment this counter)
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index bb1a4184508..57b228076e8 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -2517,6 +2517,16 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
        false for physical slots.
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>conflicting</structfield> <type>bool</type>
+      </para>
+      <para>
+       True if this logical slot conflicted with recovery (and so is now
+       invalidated). Always NULL for physical slots.
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index ab1aebfde42..06d3f1f5d34 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1472,8 +1472,9 @@ pg_replication_slots| SELECT l.slot_name,
     l.confirmed_flush_lsn,
     l.wal_status,
     l.safe_wal_size,
-    l.two_phase
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase)
+    l.two_phase,
+    l.conflicting
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
@@ -1869,7 +1870,8 @@ pg_stat_database_conflicts| SELECT oid AS datid,
     pg_stat_get_db_conflict_lock(oid) AS confl_lock,
     pg_stat_get_db_conflict_snapshot(oid) AS confl_snapshot,
     pg_stat_get_db_conflict_bufferpin(oid) AS confl_bufferpin,
-    pg_stat_get_db_conflict_startup_deadlock(oid) AS confl_deadlock
+    pg_stat_get_db_conflict_startup_deadlock(oid) AS confl_deadlock,
+    pg_stat_get_db_conflict_logicalslot(oid) AS confl_active_logicalslot
    FROM pg_database d;
 pg_stat_gssapi| SELECT pid,
     gss_auth AS gss_authenticated,
-- 
2.38.0

>From 9fba4ed4c5fcc686c5b574dd85f918b2e0e1feff Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot...@gmail.com>
Date: Mon, 3 Apr 2023 16:46:09 +0000
Subject: [PATCH va65 6/9] For cascading replication, wake up physical
 walsenders separately from logical walsenders.

Physical walsenders can't send data until it's been flushed; logical
walsenders can't decode and send data until it's been applied. On the
standby, the WAL is flushed first, which will only wake up physical
walsenders; and then applied, which will only wake up logical
walsenders.

Previously, all walsenders were awakened when the WAL was flushed. That
was fine for logical walsenders on the primary; but on the standby the
flushed WAL would have been not applied yet, so logical walsenders were
awakened too early.

Author: Bertrand Drouvot per idea from Jeff Davis and Amit Kapila.
Reviewed-By: Sawada Masahiko, Robert Haas.
---
 src/include/replication/walsender.h         | 22 +++++------
 src/include/replication/walsender_private.h |  3 ++
 src/backend/access/transam/xlog.c           |  6 +--
 src/backend/access/transam/xlogarchive.c    |  2 +-
 src/backend/access/transam/xlogrecovery.c   | 30 +++++++++++---
 src/backend/replication/walreceiver.c       |  2 +-
 src/backend/replication/walsender.c         | 43 +++++++++++++++++----
 7 files changed, 79 insertions(+), 29 deletions(-)

diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index 52bb3e2aae3..9df7e50f943 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -42,7 +42,7 @@ extern void WalSndResourceCleanup(bool isCommit);
 extern void WalSndSignals(void);
 extern Size WalSndShmemSize(void);
 extern void WalSndShmemInit(void);
-extern void WalSndWakeup(void);
+extern void WalSndWakeup(bool physical, bool logical);
 extern void WalSndInitStopping(void);
 extern void WalSndWaitStopping(void);
 extern void HandleWalSndInitStopping(void);
@@ -60,15 +60,15 @@ extern void WalSndRqstFileReload(void);
 /*
  * wakeup walsenders if there is work to be done
  */
-#define WalSndWakeupProcessRequests()		\
-	do										\
-	{										\
-		if (wake_wal_senders)				\
-		{									\
-			wake_wal_senders = false;		\
-			if (max_wal_senders > 0)		\
-				WalSndWakeup();				\
-		}									\
-	} while (0)
+static inline void
+WalSndWakeupProcessRequests(bool physical, bool logical)
+{
+	if (wake_wal_senders)
+	{
+		wake_wal_senders = false;
+		if (max_wal_senders > 0)
+			WalSndWakeup(physical, logical);
+	}
+}
 
 #endif							/* _WALSENDER_H */
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 5310e054c48..ff25aa70a89 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -15,6 +15,7 @@
 #include "access/xlog.h"
 #include "lib/ilist.h"
 #include "nodes/nodes.h"
+#include "nodes/replnodes.h"
 #include "replication/syncrep.h"
 #include "storage/latch.h"
 #include "storage/shmem.h"
@@ -79,6 +80,8 @@ typedef struct WalSnd
 	 * Timestamp of the last message received from standby.
 	 */
 	TimestampTz replyTime;
+
+	ReplicationKind kind;
 } WalSnd;
 
 extern PGDLLIMPORT WalSnd *MyWalSnd;
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 80a7cd8948f..d30575d8741 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -2645,7 +2645,7 @@ XLogFlush(XLogRecPtr record)
 	END_CRIT_SECTION();
 
 	/* wake up walsenders now that we've released heavily contended locks */
-	WalSndWakeupProcessRequests();
+	WalSndWakeupProcessRequests(true, !RecoveryInProgress());
 
 	/*
 	 * If we still haven't flushed to the request point then we have a
@@ -2816,7 +2816,7 @@ XLogBackgroundFlush(void)
 	END_CRIT_SECTION();
 
 	/* wake up walsenders now that we've released heavily contended locks */
-	WalSndWakeupProcessRequests();
+	WalSndWakeupProcessRequests(true, !RecoveryInProgress());
 
 	/*
 	 * Great, done. To take some work off the critical path, try to initialize
@@ -5762,7 +5762,7 @@ StartupXLOG(void)
 	 * If there were cascading standby servers connected to us, nudge any wal
 	 * sender processes to notice that we've been promoted.
 	 */
-	WalSndWakeup();
+	WalSndWakeup(true, true);
 
 	/*
 	 * If this was a promotion, request an (online) checkpoint now. This isn't
diff --git a/src/backend/access/transam/xlogarchive.c b/src/backend/access/transam/xlogarchive.c
index a0f5aa24b58..f3fb92c8f96 100644
--- a/src/backend/access/transam/xlogarchive.c
+++ b/src/backend/access/transam/xlogarchive.c
@@ -421,7 +421,7 @@ KeepFileRestoredFromArchive(const char *path, const char *xlogfname)
 	 * if we restored something other than a WAL segment, but it does no harm
 	 * either.
 	 */
-	WalSndWakeup();
+	WalSndWakeup(true, false);
 }
 
 /*
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index dbe93947627..e6427c54c57 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -1935,6 +1935,30 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
 	XLogRecoveryCtl->lastReplayedTLI = *replayTLI;
 	SpinLockRelease(&XLogRecoveryCtl->info_lck);
 
+	/*
+	 * Wakeup walsenders:
+	 *
+	 * On the standby, the WAL is flushed first (which will only wake up
+	 * physical walsenders) and then applied, which will only wake up logical
+	 * walsenders.
+	 *
+	 * Indeed, logical walsenders on standby can't decode and send data until
+	 * it's been applied.
+	 *
+	 * Physical walsenders don't need to be woken up during replay unless
+	 * cascading replication is allowed and time line change occured (so that
+	 * they can notice that they are on a new time line).
+	 *
+	 * That's why the wake up conditions are for:
+	 *
+	 *  - physical walsenders in case of new time line and cascade
+	 *  replication is allowed.
+	 *  - logical walsenders in case cascade replication is allowed (could not
+	 *  be created otherwise).
+	 */
+	if (AllowCascadeReplication())
+		WalSndWakeup(switchedTLI, true);
+
 	/*
 	 * If rm_redo called XLogRequestWalReceiverReply, then we wake up the
 	 * receiver so that it notices the updated lastReplayedEndRecPtr and sends
@@ -1958,12 +1982,6 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
 		 */
 		RemoveNonParentXlogFiles(xlogreader->EndRecPtr, *replayTLI);
 
-		/*
-		 * Wake up any walsenders to notice that we are on a new timeline.
-		 */
-		if (AllowCascadeReplication())
-			WalSndWakeup();
-
 		/* Reset the prefetcher. */
 		XLogPrefetchReconfigure();
 	}
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 685af51d5d3..feff7094351 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1010,7 +1010,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
 		/* Signal the startup process and walsender that new WAL has arrived */
 		WakeupRecovery();
 		if (AllowCascadeReplication())
-			WalSndWakeup();
+			WalSndWakeup(true, false);
 
 		/* Report XLOG streaming progress in PS display */
 		if (update_process_title)
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index e40a9b1ba7b..66493b6e896 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2603,6 +2603,23 @@ InitWalSenderSlot(void)
 			walsnd->sync_standby_priority = 0;
 			walsnd->latch = &MyProc->procLatch;
 			walsnd->replyTime = 0;
+
+			/*
+			 * The kind assignment is done here and not in StartReplication()
+			 * and StartLogicalReplication(). Indeed, the logical walsender
+			 * needs to read WAL records (like snapshot of running
+			 * transactions) during the slot creation. So it needs to be woken
+			 * up based on its kind.
+			 *
+			 * The kind assignment could also be done in StartReplication(),
+			 * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
+			 * seems better to set it on one place.
+			 */
+			if (MyDatabaseId == InvalidOid)
+				walsnd->kind = REPLICATION_KIND_PHYSICAL;
+			else
+				walsnd->kind = REPLICATION_KIND_LOGICAL;
+
 			SpinLockRelease(&walsnd->mutex);
 			/* don't need the lock anymore */
 			MyWalSnd = (WalSnd *) walsnd;
@@ -3280,30 +3297,42 @@ WalSndShmemInit(void)
 }
 
 /*
- * Wake up all walsenders
+ * Wake up physical, logical or both walsenders kind
+ *
+ * The distinction between physical and logical walsenders is done, because:
+ * - physical walsenders can't send data until it's been flushed
+ * - logical walsenders on standby can't decode and send data until it's been
+ * applied
+ *
+ * For cascading replication we need to wake up physical
+ * walsenders separately from logical walsenders (see the comment before calling
+ * WalSndWakeup() in ApplyWalRecord() for more details).
  *
  * This will be called inside critical sections, so throwing an error is not
  * advisable.
  */
 void
-WalSndWakeup(void)
+WalSndWakeup(bool physical, bool logical)
 {
 	int			i;
 
 	for (i = 0; i < max_wal_senders; i++)
 	{
 		Latch	   *latch;
+		ReplicationKind kind;
 		WalSnd	   *walsnd = &WalSndCtl->walsnds[i];
 
-		/*
-		 * Get latch pointer with spinlock held, for the unlikely case that
-		 * pointer reads aren't atomic (as they're 8 bytes).
-		 */
+		/* get latch pointer and kind with spinlock helds */
 		SpinLockAcquire(&walsnd->mutex);
 		latch = walsnd->latch;
+		kind = walsnd->kind;
 		SpinLockRelease(&walsnd->mutex);
 
-		if (latch != NULL)
+		if (latch == NULL)
+			continue;
+
+		if ((physical && kind == REPLICATION_KIND_PHYSICAL) ||
+			(logical && kind == REPLICATION_KIND_LOGICAL))
 			SetLatch(latch);
 	}
 }
-- 
2.38.0

>From 99dae75e451c8895b6603b693a9db47f877307a2 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Thu, 6 Apr 2023 23:34:22 -0700
Subject: [PATCH va65 7/9] Allow logical decoding on standby.

Allow a logical slot to be created on standby. Restrict its usage
or its creation if wal_level on primary is less than logical.
During slot creation, it's restart_lsn is set to the last replayed
LSN. Effectively, a logical slot creation on standby waits for an
xl_running_xact record to arrive from primary.

Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes Mello
---
 src/include/access/xlog.h                 |  1 +
 src/backend/access/transam/xlog.c         | 11 +++++
 src/backend/replication/logical/decode.c  | 30 +++++++++++-
 src/backend/replication/logical/logical.c | 36 ++++++++------
 src/backend/replication/slot.c            | 59 ++++++++++++-----------
 src/backend/replication/walsender.c       | 48 ++++++++++++------
 6 files changed, 124 insertions(+), 61 deletions(-)

diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index cfe5409738c..48ca8523810 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -230,6 +230,7 @@ extern void XLOGShmemInit(void);
 extern void BootStrapXLOG(void);
 extern void InitializeWalConsistencyChecking(void);
 extern void LocalProcessControlFile(bool reset);
+extern WalLevel GetActiveWalLevelOnStandby(void);
 extern void StartupXLOG(void);
 extern void ShutdownXLOG(int code, Datum arg);
 extern void CreateCheckPoint(int flags);
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index d30575d8741..a4153518fdb 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -4466,6 +4466,17 @@ LocalProcessControlFile(bool reset)
 	ReadControlFile();
 }
 
+/*
+ * Get the wal_level from the control file. For a standby, this value should be
+ * considered as its active wal_level, because it may be different from what
+ * was originally configured on standby.
+ */
+WalLevel
+GetActiveWalLevelOnStandby(void)
+{
+	return ControlFile->wal_level;
+}
+
 /*
  * Initialization of shared memory for XLOG
  */
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 8fe7bb65f1f..8352dbf5df6 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -152,11 +152,39 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			 * can restart from there.
 			 */
 			break;
+		case XLOG_PARAMETER_CHANGE:
+			{
+				xl_parameter_change *xlrec =
+				(xl_parameter_change *) XLogRecGetData(buf->record);
+
+				/*
+				 * If wal_level on primary is reduced to less than logical,
+				 * then we want to prevent existing logical slots from being
+				 * used. Existing logical slots on standby get invalidated
+				 * when this WAL record is replayed; and further, slot
+				 * creation fails when the wal level is not sufficient; but
+				 * all these operations are not synchronized, so a logical
+				 * slot may creep in while the wal_level is being reduced.
+				 * Hence this extra check.
+				 */
+				if (xlrec->wal_level < WAL_LEVEL_LOGICAL)
+				{
+					/*
+					 * This can occur only on a standby, as a primary would
+					 * not allow to restart after changing wal_level < logical
+					 * if there is pre-existing logical slot.
+					 */
+					Assert(RecoveryInProgress());
+					ereport(ERROR,
+							(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+							 errmsg("logical decoding on standby requires wal_level to be at least logical on the primary server")));
+				}
+				break;
+			}
 		case XLOG_NOOP:
 		case XLOG_NEXTOID:
 		case XLOG_SWITCH:
 		case XLOG_BACKUP_END:
-		case XLOG_PARAMETER_CHANGE:
 		case XLOG_RESTORE_POINT:
 		case XLOG_FPW_CHANGE:
 		case XLOG_FPI_FOR_HINT:
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 27addd58f66..659b08cd456 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -124,23 +124,21 @@ CheckLogicalDecodingRequirements(void)
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("logical decoding requires a database connection")));
 
-	/* ----
-	 * TODO: We got to change that someday soon...
-	 *
-	 * There's basically three things missing to allow this:
-	 * 1) We need to be able to correctly and quickly identify the timeline a
-	 *	  LSN belongs to
-	 * 2) We need to force hot_standby_feedback to be enabled at all times so
-	 *	  the primary cannot remove rows we need.
-	 * 3) support dropping replication slots referring to a database, in
-	 *	  dbase_redo. There can't be any active ones due to HS recovery
-	 *	  conflicts, so that should be relatively easy.
-	 * ----
-	 */
 	if (RecoveryInProgress())
-		ereport(ERROR,
-				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				 errmsg("logical decoding cannot be used while in recovery")));
+	{
+		/*
+		 * This check may have race conditions, but whenever
+		 * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we
+		 * verify that there are no existing logical replication slots. And to
+		 * avoid races around creating a new slot,
+		 * CheckLogicalDecodingRequirements() is called once before creating
+		 * the slot, and once when logical decoding is initially starting up.
+		 */
+		if (GetActiveWalLevelOnStandby() < WAL_LEVEL_LOGICAL)
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("logical decoding on standby requires wal_level to be at least logical on the primary server")));
+	}
 }
 
 /*
@@ -342,6 +340,12 @@ CreateInitDecodingContext(const char *plugin,
 	LogicalDecodingContext *ctx;
 	MemoryContext old_context;
 
+	/*
+	 * On standby, this check is also required while creating the slot. Check
+	 * the comments in this function.
+	 */
+	CheckLogicalDecodingRequirements();
+
 	/* shorter lines... */
 	slot = MyReplicationSlot;
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 3e8c1b44e85..411f6c01491 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -41,6 +41,7 @@
 
 #include "access/transam.h"
 #include "access/xlog_internal.h"
+#include "access/xlogrecovery.h"
 #include "common/file_utils.h"
 #include "common/string.h"
 #include "miscadmin.h"
@@ -1192,37 +1193,28 @@ ReplicationSlotReserveWal(void)
 		/*
 		 * For logical slots log a standby snapshot and start logical decoding
 		 * at exactly that position. That allows the slot to start up more
-		 * quickly.
+		 * quickly. But on a standby we cannot do WAL writes, so just use the
+		 * replay pointer; effectively, an attempt to create a logical slot on
+		 * standby will cause it to wait for an xl_running_xact record to be
+		 * logged independently on the primary, so that a snapshot can be
+		 * built using the record.
 		 *
-		 * That's not needed (or indeed helpful) for physical slots as they'll
-		 * start replay at the last logged checkpoint anyway. Instead return
-		 * the location of the last redo LSN. While that slightly increases
-		 * the chance that we have to retry, it's where a base backup has to
-		 * start replay at.
+		 * None of this is needed (or indeed helpful) for physical slots as
+		 * they'll start replay at the last logged checkpoint anyway. Instead
+		 * return the location of the last redo LSN. While that slightly
+		 * increases the chance that we have to retry, it's where a base
+		 * backup has to start replay at.
 		 */
-		if (!RecoveryInProgress() && SlotIsLogical(slot))
-		{
-			XLogRecPtr	flushptr;
-
-			/* start at current insert position */
-			restart_lsn = GetXLogInsertRecPtr();
-			SpinLockAcquire(&slot->mutex);
-			slot->data.restart_lsn = restart_lsn;
-			SpinLockRelease(&slot->mutex);
-
-			/* make sure we have enough information to start */
-			flushptr = LogStandbySnapshot();
-
-			/* and make sure it's fsynced to disk */
-			XLogFlush(flushptr);
-		}
-		else
-		{
+		if (SlotIsPhysical(slot))
 			restart_lsn = GetRedoRecPtr();
-			SpinLockAcquire(&slot->mutex);
-			slot->data.restart_lsn = restart_lsn;
-			SpinLockRelease(&slot->mutex);
-		}
+		else if (RecoveryInProgress())
+			restart_lsn = GetXLogReplayRecPtr(NULL);
+		else
+			restart_lsn = GetXLogInsertRecPtr();
+
+		SpinLockAcquire(&slot->mutex);
+		slot->data.restart_lsn = restart_lsn;
+		SpinLockRelease(&slot->mutex);
 
 		/* prevent WAL removal as fast as possible */
 		ReplicationSlotsComputeRequiredLSN();
@@ -1238,6 +1230,17 @@ ReplicationSlotReserveWal(void)
 		if (XLogGetLastRemovedSegno() < segno)
 			break;
 	}
+
+	if (!RecoveryInProgress() && SlotIsLogical(slot))
+	{
+		XLogRecPtr	flushptr;
+
+		/* make sure we have enough information to start */
+		flushptr = LogStandbySnapshot();
+
+		/* and make sure it's fsynced to disk */
+		XLogFlush(flushptr);
+	}
 }
 
 /*
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 66493b6e896..743e338b1b6 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -906,23 +906,34 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 	int			count;
 	WALReadError errinfo;
 	XLogSegNo	segno;
-	TimeLineID	currTLI = GetWALInsertionTimeLine();
+	TimeLineID	currTLI;
 
 	/*
-	 * Since logical decoding is only permitted on a primary server, we know
-	 * that the current timeline ID can't be changing any more. If we did this
-	 * on a standby, we'd have to worry about the values we compute here
-	 * becoming invalid due to a promotion or timeline change.
+	 * Make sure we have enough WAL available before retrieving the current
+	 * timeline. This is needed to determine am_cascading_walsender accurately
+	 * which is needed to determine the current timeline.
 	 */
+	flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
+
+	/*
+	 * Since logical decoding is also permitted on a standby server, we need
+	 * to check if the server is in recovery to decide how to get the current
+	 * timeline ID (so that it also cover the promotion or timeline change
+	 * cases).
+	 */
+	am_cascading_walsender = RecoveryInProgress();
+
+	if (am_cascading_walsender)
+		GetXLogReplayRecPtr(&currTLI);
+	else
+		currTLI = GetWALInsertionTimeLine();
+
 	XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
 	sendTimeLineIsHistoric = (state->currTLI != currTLI);
 	sendTimeLine = state->currTLI;
 	sendTimeLineValidUpto = state->currTLIValidUntil;
 	sendTimeLineNextTLI = state->nextTLI;
 
-	/* make sure we have enough WAL available */
-	flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
-
 	/* fail if not (implies we are going to shut down) */
 	if (flushptr < targetPagePtr + reqLen)
 		return -1;
@@ -937,9 +948,9 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 				 cur_page,
 				 targetPagePtr,
 				 XLOG_BLCKSZ,
-				 state->seg.ws_tli, /* Pass the current TLI because only
-									 * WalSndSegmentOpen controls whether new
-									 * TLI is needed. */
+				 currTLI,		/* Pass the current TLI because only
+								 * WalSndSegmentOpen controls whether new TLI
+								 * is needed. */
 				 &errinfo))
 		WALReadRaiseError(&errinfo);
 
@@ -3076,10 +3087,14 @@ XLogSendLogical(void)
 	 * If first time through in this session, initialize flushPtr.  Otherwise,
 	 * we only need to update flushPtr if EndRecPtr is past it.
 	 */
-	if (flushPtr == InvalidXLogRecPtr)
-		flushPtr = GetFlushRecPtr(NULL);
-	else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
-		flushPtr = GetFlushRecPtr(NULL);
+	if (flushPtr == InvalidXLogRecPtr ||
+		logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
+	{
+		if (am_cascading_walsender)
+			flushPtr = GetStandbyFlushRecPtr(NULL);
+		else
+			flushPtr = GetFlushRecPtr(NULL);
+	}
 
 	/* If EndRecPtr is still past our flushPtr, it means we caught up. */
 	if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
@@ -3170,7 +3185,8 @@ GetStandbyFlushRecPtr(TimeLineID *tli)
 	receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
 	replayPtr = GetXLogReplayRecPtr(&replayTLI);
 
-	*tli = replayTLI;
+	if (tli)
+		*tli = replayTLI;
 
 	result = replayPtr;
 	if (receiveTLI == replayTLI && receivePtr > replayPtr)
-- 
2.38.0

>From 829a3358dc13f480d98b40fff921f3ba26e06a2c Mon Sep 17 00:00:00 2001
From: bdrouvotAWS <bdrou...@amazon.com>
Date: Tue, 7 Feb 2023 09:04:12 +0000
Subject: [PATCH va65 8/9] New TAP test for logical decoding on standby.

In addition to the new TAP test, this commit introduces a new pg_log_standby_snapshot()
function.

The idea is to be able to take a snapshot of running transactions and write this
to WAL without requesting for a (costly) checkpoint.

Author: Craig Ringer (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes Mello
---
 src/include/catalog/pg_proc.dat               |   3 +
 src/backend/access/transam/xlogfuncs.c        |  32 +
 src/backend/catalog/system_functions.sql      |   2 +
 doc/src/sgml/func.sgml                        |  15 +
 src/test/perl/PostgreSQL/Test/Cluster.pm      |  37 +
 src/test/recovery/meson.build                 |   1 +
 .../t/035_standby_logical_decoding.pl         | 764 ++++++++++++++++++
 7 files changed, 854 insertions(+)
 create mode 100644 src/test/recovery/t/035_standby_logical_decoding.pl

diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index bcbae9036d1..284138727e2 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -6426,6 +6426,9 @@
 { oid => '2848', descr => 'switch to new wal file',
   proname => 'pg_switch_wal', provolatile => 'v', prorettype => 'pg_lsn',
   proargtypes => '', prosrc => 'pg_switch_wal' },
+{ oid => '9658', descr => 'log details of the current snapshot to WAL',
+  proname => 'pg_log_standby_snapshot', provolatile => 'v', prorettype => 'pg_lsn',
+  proargtypes => '', prosrc => 'pg_log_standby_snapshot' },
 { oid => '3098', descr => 'create a named restore point',
   proname => 'pg_create_restore_point', provolatile => 'v',
   prorettype => 'pg_lsn', proargtypes => 'text',
diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c
index c07daa874f9..36a309b54cc 100644
--- a/src/backend/access/transam/xlogfuncs.c
+++ b/src/backend/access/transam/xlogfuncs.c
@@ -31,6 +31,7 @@
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/smgr.h"
+#include "storage/standby.h"
 #include "utils/builtins.h"
 #include "utils/guc.h"
 #include "utils/memutils.h"
@@ -196,6 +197,37 @@ pg_switch_wal(PG_FUNCTION_ARGS)
 	PG_RETURN_LSN(switchpoint);
 }
 
+/*
+ * pg_log_standby_snapshot: call LogStandbySnapshot()
+ *
+ * Permission checking for this function is managed through the normal
+ * GRANT system.
+ */
+Datum
+pg_log_standby_snapshot(PG_FUNCTION_ARGS)
+{
+	XLogRecPtr	recptr;
+
+	if (RecoveryInProgress())
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("recovery is in progress"),
+				 errhint("pg_log_standby_snapshot() cannot be executed during recovery.")));
+
+	if (!XLogStandbyInfoActive())
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("wal_level is not in desired state"),
+				 errhint("wal_level has to be >= WAL_LEVEL_REPLICA.")));
+
+	recptr = LogStandbySnapshot();
+
+	/*
+	 * As a convenience, return the WAL location of the last inserted record
+	 */
+	PG_RETURN_LSN(recptr);
+}
+
 /*
  * pg_create_restore_point: a named point for restore
  *
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 83ca8934440..b7c65ea37d7 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -644,6 +644,8 @@ REVOKE EXECUTE ON FUNCTION pg_create_restore_point(text) FROM public;
 
 REVOKE EXECUTE ON FUNCTION pg_switch_wal() FROM public;
 
+REVOKE EXECUTE ON FUNCTION pg_log_standby_snapshot() FROM public;
+
 REVOKE EXECUTE ON FUNCTION pg_wal_replay_pause() FROM public;
 
 REVOKE EXECUTE ON FUNCTION pg_wal_replay_resume() FROM public;
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index dc44a74eb25..9253cd1c188 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -27032,6 +27032,21 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         prepared with <xref linkend="sql-prepare-transaction"/>.
        </para></entry>
       </row>
+      <row>
+       <entry role="func_table_entry"><para role="func_signature">
+        <indexterm>
+         <primary>pg_log_standby_snapshot</primary>
+        </indexterm>
+        <function>pg_log_standby_snapshot</function> ()
+        <returnvalue>pg_lsn</returnvalue>
+       </para>
+       <para>
+        Take a snapshot of running transactions and write this to WAL without
+        having to wait bgwriter or checkpointer to log one. This one is useful for
+        logical decoding on standby for which logical slot creation is hanging
+        until such a record is replayed on the standby.
+       </para></entry>
+      </row>
      </tbody>
     </tgroup>
    </table>
diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm b/src/test/perl/PostgreSQL/Test/Cluster.pm
index a3aef8b5e91..62376de602b 100644
--- a/src/test/perl/PostgreSQL/Test/Cluster.pm
+++ b/src/test/perl/PostgreSQL/Test/Cluster.pm
@@ -3076,6 +3076,43 @@ $SIG{TERM} = $SIG{INT} = sub {
 
 =pod
 
+=item $node->create_logical_slot_on_standby(self, primary, slot_name, dbname)
+
+Create logical replication slot on given standby
+
+=cut
+
+sub create_logical_slot_on_standby
+{
+	my ($self, $primary, $slot_name, $dbname) = @_;
+	my ($stdout, $stderr);
+
+	my $handle;
+
+	$handle = IPC::Run::start(['pg_recvlogical', '-d', $self->connstr($dbname), '-P', 'test_decoding', '-S', $slot_name, '--create-slot'], '>', \$stdout, '2>', \$stderr);
+
+	# Once slot restart_lsn is created, the standby looks for xl_running_xacts
+	# WAL record from the restart_lsn onwards. So firstly, wait until the slot
+	# restart_lsn is evaluated.
+
+	$self->poll_query_until(
+		'postgres', qq[
+		SELECT restart_lsn IS NOT NULL
+		FROM pg_catalog.pg_replication_slots WHERE slot_name = '$slot_name'
+	]) or die "timed out waiting for logical slot to calculate its restart_lsn";
+
+	# Now arrange for the xl_running_xacts record for which pg_recvlogical
+	# is waiting.
+	$primary->safe_psql('postgres', 'SELECT pg_log_standby_snapshot()');
+
+	$handle->finish();
+
+	is($self->slot($slot_name)->{'slot_type'}, 'logical', $slot_name . ' on standby created')
+		or die "could not create slot" . $slot_name;
+}
+
+=pod
+
 =back
 
 =cut
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index 59465b97f3f..e834ad5e0dc 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -40,6 +40,7 @@ tests += {
       't/032_relfilenode_reuse.pl',
       't/033_replay_tsp_drops.pl',
       't/034_create_database.pl',
+      't/035_standby_logical_decoding.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl b/src/test/recovery/t/035_standby_logical_decoding.pl
new file mode 100644
index 00000000000..e2c46a6bf6f
--- /dev/null
+++ b/src/test/recovery/t/035_standby_logical_decoding.pl
@@ -0,0 +1,764 @@
+# logical decoding on standby : test logical decoding,
+# recovery conflict and standby promotion.
+
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my ($stdin, $stdout, $stderr, $cascading_stdout, $cascading_stderr, $ret, $handle, $slot);
+
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $node_cascading_standby = PostgreSQL::Test::Cluster->new('cascading_standby');
+my $default_timeout = $PostgreSQL::Test::Utils::timeout_default;
+my $res;
+
+# Name for the physical slot on primary
+my $primary_slotname = 'primary_physical';
+my $standby_physical_slotname = 'standby_physical';
+
+# find $pat in logfile of $node after $off-th byte
+sub find_in_log
+{
+	my ($node, $pat, $off) = @_;
+
+	$off = 0 unless defined $off;
+	my $log = PostgreSQL::Test::Utils::slurp_file($node->logfile);
+	return 0 if (length($log) <= $off);
+
+	$log = substr($log, $off);
+
+	return $log =~ m/$pat/;
+}
+
+# Fetch xmin columns from slot's pg_replication_slots row, after waiting for
+# given boolean condition to be true to ensure we've reached a quiescent state.
+sub wait_for_xmins
+{
+	my ($node, $slotname, $check_expr) = @_;
+
+	$node->poll_query_until(
+		'postgres', qq[
+		SELECT $check_expr
+		FROM pg_catalog.pg_replication_slots
+		WHERE slot_name = '$slotname';
+	]) or die "Timed out waiting for slot xmins to advance";
+}
+
+# Create the required logical slots on standby.
+sub create_logical_slots
+{
+	my ($node) = @_;
+	$node->create_logical_slot_on_standby($node_primary, 'inactiveslot', 'testdb');
+	$node->create_logical_slot_on_standby($node_primary, 'activeslot', 'testdb');
+}
+
+# Drop the logical slots on standby.
+sub drop_logical_slots
+{
+	$node_standby->psql('postgres', q[SELECT pg_drop_replication_slot('inactiveslot')]);
+	$node_standby->psql('postgres', q[SELECT pg_drop_replication_slot('activeslot')]);
+}
+
+# Acquire one of the standby logical slots created by create_logical_slots().
+# In case wait is true we are waiting for an active pid on the 'activeslot' slot.
+# If wait is not true it means we are testing a known failure scenario.
+sub make_slot_active
+{
+	my ($node, $wait, $to_stdout, $to_stderr) = @_;
+	my $slot_user_handle;
+
+	$slot_user_handle = IPC::Run::start(['pg_recvlogical', '-d', $node->connstr('testdb'), '-S', 'activeslot', '-o', 'include-xids=0', '-o', 'skip-empty-xacts=1', '--no-loop', '--start', '-f', '-'], '>', $to_stdout, '2>', $to_stderr);
+
+	if ($wait)
+	{
+		# make sure activeslot is in use
+		$node->poll_query_until('testdb',
+			"SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'activeslot' AND active_pid IS NOT NULL)"
+		) or die "slot never became active";
+	}
+	return $slot_user_handle;
+}
+
+# Check pg_recvlogical stderr
+sub check_pg_recvlogical_stderr
+{
+	my ($slot_user_handle, $check_stderr) = @_;
+	my $return;
+
+	# our client should've terminated in response to the walsender error
+	$slot_user_handle->finish;
+	$return = $?;
+	cmp_ok($return, "!=", 0, "pg_recvlogical exited non-zero");
+	if ($return) {
+		like($stderr, qr/$check_stderr/, 'slot has been invalidated');
+	}
+
+	return 0;
+}
+
+# Check if all the slots on standby are dropped. These include the 'activeslot'
+# that was acquired by make_slot_active(), and the non-active 'inactiveslot'.
+sub check_slots_dropped
+{
+	my ($slot_user_handle) = @_;
+
+	is($node_standby->slot('inactiveslot')->{'slot_type'}, '', 'inactiveslot on standby dropped');
+	is($node_standby->slot('activeslot')->{'slot_type'}, '', 'activeslot on standby dropped');
+
+	check_pg_recvlogical_stderr($slot_user_handle, "conflict with recovery");
+}
+
+# Change hot_standby_feedback and check xmin and catalog_xmin values.
+sub change_hot_standby_feedback_and_wait_for_xmins
+{
+	my ($hsf, $invalidated) = @_;
+
+	$node_standby->append_conf('postgresql.conf',qq[
+	hot_standby_feedback = $hsf
+	]);
+
+	$node_standby->reload;
+
+	if ($hsf && $invalidated)
+	{
+		# With hot_standby_feedback on, xmin should advance,
+		# but catalog_xmin should still remain NULL since there is no logical slot.
+		wait_for_xmins($node_primary, $primary_slotname,
+			   "xmin IS NOT NULL AND catalog_xmin IS NULL");
+	}
+	elsif ($hsf)
+	{
+		# With hot_standby_feedback on, xmin and catalog_xmin should advance.
+		wait_for_xmins($node_primary, $primary_slotname,
+			   "xmin IS NOT NULL AND catalog_xmin IS NOT NULL");
+	}
+	else
+	{
+		# Both should be NULL since hs_feedback is off
+		wait_for_xmins($node_primary, $primary_slotname,
+			   "xmin IS NULL AND catalog_xmin IS NULL");
+
+	}
+}
+
+# Check conflicting status in pg_replication_slots.
+sub check_slots_conflicting_status
+{
+	my ($conflicting) = @_;
+
+	if ($conflicting)
+	{
+		$res = $node_standby->safe_psql(
+				'postgres', qq(
+				 select bool_and(conflicting) from pg_replication_slots;));
+
+		is($res, 't',
+			"Logical slots are reported as conflicting");
+	}
+	else
+	{
+		$res = $node_standby->safe_psql(
+				'postgres', qq(
+				select bool_or(conflicting) from pg_replication_slots;));
+
+		is($res, 'f',
+			"Logical slots are reported as non conflicting");
+	}
+}
+
+########################
+# Initialize primary node
+########################
+
+$node_primary->init(allows_streaming => 1, has_archiving => 1);
+$node_primary->append_conf('postgresql.conf', q{
+wal_level = 'logical'
+max_replication_slots = 4
+max_wal_senders = 4
+log_min_messages = 'debug2'
+log_error_verbosity = verbose
+});
+$node_primary->dump_info;
+$node_primary->start;
+
+$node_primary->psql('postgres', q[CREATE DATABASE testdb]);
+
+$node_primary->safe_psql('testdb', qq[SELECT * FROM pg_create_physical_replication_slot('$primary_slotname');]);
+
+# Check conflicting is NULL for physical slot
+$res = $node_primary->safe_psql(
+		'postgres', qq[
+		 SELECT conflicting is null FROM pg_replication_slots where slot_name = '$primary_slotname';]);
+
+is($res, 't',
+	"Physical slot reports conflicting as NULL");
+
+my $backup_name = 'b1';
+$node_primary->backup($backup_name);
+
+#######################
+# Initialize standby node
+#######################
+
+$node_standby->init_from_backup(
+	$node_primary, $backup_name,
+	has_streaming => 1,
+	has_restoring => 1);
+$node_standby->append_conf('postgresql.conf',
+	qq[primary_slot_name = '$primary_slotname']);
+$node_standby->start;
+$node_primary->wait_for_replay_catchup($node_standby);
+$node_standby->safe_psql('testdb', qq[SELECT * FROM pg_create_physical_replication_slot('$standby_physical_slotname');]);
+
+#######################
+# Initialize cascading standby node
+#######################
+$node_standby->backup($backup_name);
+$node_cascading_standby->init_from_backup(
+	$node_standby, $backup_name,
+	has_streaming => 1,
+	has_restoring => 1);
+$node_cascading_standby->append_conf('postgresql.conf',
+	qq[primary_slot_name = '$standby_physical_slotname']);
+$node_cascading_standby->start;
+$node_standby->wait_for_replay_catchup($node_cascading_standby, $node_primary);
+
+##################################################
+# Test that logical decoding on the standby
+# behaves correctly.
+##################################################
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+$node_primary->safe_psql('testdb', qq[CREATE TABLE decoding_test(x integer, y text);]);
+$node_primary->safe_psql('testdb', qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,10) s;]);
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+my $result = $node_standby->safe_psql('testdb',
+	qq[SELECT pg_logical_slot_get_changes('activeslot', NULL, NULL);]);
+
+# test if basic decoding works
+is(scalar(my @foobar = split /^/m, $result),
+	14, 'Decoding produced 14 rows (2 BEGIN/COMMIT and 10 rows)');
+
+# Insert some rows and verify that we get the same results from pg_recvlogical
+# and the SQL interface.
+$node_primary->safe_psql('testdb',
+	qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;]
+);
+
+my $expected = q{BEGIN
+table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
+table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
+table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
+table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
+COMMIT};
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+my $stdout_sql = $node_standby->safe_psql('testdb',
+	qq[SELECT data FROM pg_logical_slot_peek_changes('activeslot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
+);
+
+is($stdout_sql, $expected, 'got expected output from SQL decoding session');
+
+my $endpos = $node_standby->safe_psql('testdb',
+	"SELECT lsn FROM pg_logical_slot_peek_changes('activeslot', NULL, NULL) ORDER BY lsn DESC LIMIT 1;"
+);
+
+# Insert some rows after $endpos, which we won't read.
+$node_primary->safe_psql('testdb',
+	qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(5,50) s;]
+);
+
+$node_primary->wait_for_catchup($node_standby);
+
+my $stdout_recv = $node_standby->pg_recvlogical_upto(
+    'testdb', 'activeslot', $endpos, $default_timeout,
+    'include-xids'     => '0',
+    'skip-empty-xacts' => '1');
+chomp($stdout_recv);
+is($stdout_recv, $expected,
+    'got same expected output from pg_recvlogical decoding session');
+
+$node_standby->poll_query_until('testdb',
+	"SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'activeslot' AND active_pid IS NULL)"
+) or die "slot never became inactive";
+
+$stdout_recv = $node_standby->pg_recvlogical_upto(
+    'testdb', 'activeslot', $endpos, $default_timeout,
+    'include-xids'     => '0',
+    'skip-empty-xacts' => '1');
+chomp($stdout_recv);
+is($stdout_recv, '', 'pg_recvlogical acknowledged changes');
+
+$node_primary->safe_psql('postgres', 'CREATE DATABASE otherdb');
+
+is( $node_primary->psql(
+        'otherdb',
+        "SELECT lsn FROM pg_logical_slot_peek_changes('activeslot', NULL, NULL) ORDER BY lsn DESC LIMIT 1;"
+    ),
+    3,
+    'replaying logical slot from another database fails');
+
+# drop the logical slots
+drop_logical_slots();
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 1: hot_standby_feedback off and vacuum FULL
+##################################################
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+# One way to produce recovery conflict is to create/drop a relation and
+# launch a vacuum full on pg_class with hot_standby_feedback turned off on
+# the standby.
+change_hot_standby_feedback_and_wait_for_xmins(0,1);
+
+$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+
+# This should trigger the conflict
+$node_primary->safe_psql('testdb', qq[CREATE TABLE conflict_test(x integer, y text);]);
+$node_primary->safe_psql('testdb', qq[DROP TABLE conflict_test;]);
+$node_primary->safe_psql('testdb', 'VACUUM full pg_class;');
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# message should be issued
+ok( find_in_log(
+   $node_standby,
+  "invalidating obsolete replication slot \"inactiveslot\""),
+  'inactiveslot slot invalidation is logged with vacuum FULL on pg_class');
+
+ok( find_in_log(
+   $node_standby,
+  "invalidating obsolete replication slot \"activeslot\""),
+  'activeslot slot invalidation is logged with vacuum FULL on pg_class');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been updated
+ok( $node_standby->poll_query_until(
+	'postgres',
+	"select (confl_active_logicalslot = 1) from pg_stat_database_conflicts where datname = 'testdb'", 't'),
+	'confl_active_logicalslot updated') or die "Timed out waiting confl_active_logicalslot to be updated";
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr);
+
+# We are not able to read from the slot as it has been invalidated
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication slot \"activeslot\"");
+
+# Turn hot_standby_feedback back on
+change_hot_standby_feedback_and_wait_for_xmins(1,1);
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 2: conflict due to row removal with hot_standby_feedback off.
+##################################################
+
+# get the position to search from in the standby logfile
+my $logstart = -s $node_standby->logfile;
+
+# drop the logical slots
+drop_logical_slots();
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+# One way to produce recovery conflict is to create/drop a relation and
+# launch a vacuum on pg_class with hot_standby_feedback turned off on the standby.
+change_hot_standby_feedback_and_wait_for_xmins(0,1);
+
+$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+
+# This should trigger the conflict
+$node_primary->safe_psql('testdb', qq[CREATE TABLE conflict_test(x integer, y text);]);
+$node_primary->safe_psql('testdb', qq[DROP TABLE conflict_test;]);
+$node_primary->safe_psql('testdb', 'VACUUM pg_class;');
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# message should be issued
+ok( find_in_log(
+   $node_standby,
+  "invalidating obsolete replication slot \"inactiveslot\"", $logstart),
+  'inactiveslot slot invalidation is logged with vacuum on pg_class');
+
+ok( find_in_log(
+   $node_standby,
+  "invalidating obsolete replication slot \"activeslot\"", $logstart),
+  'activeslot slot invalidation is logged with vacuum on pg_class');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been updated
+# we now expect 2 conflicts reported as the counter persist across reloads
+ok( $node_standby->poll_query_until(
+	'postgres',
+	"select (confl_active_logicalslot = 2) from pg_stat_database_conflicts where datname = 'testdb'", 't'),
+	'confl_active_logicalslot updated') or die "Timed out waiting confl_active_logicalslot to be updated";
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr);
+
+# We are not able to read from the slot as it has been invalidated
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication slot \"activeslot\"");
+
+##################################################
+# Recovery conflict: Same as Scenario 2 but on a shared catalog table
+# Scenario 3: conflict due to row removal with hot_standby_feedback off.
+##################################################
+
+# get the position to search from in the standby logfile
+$logstart = -s $node_standby->logfile;
+
+# drop the logical slots
+drop_logical_slots();
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+# One way to produce recovery conflict is to create/drop a relation and
+# launch a vacuum on pg_class with hot_standby_feedback turned off on the standby.
+change_hot_standby_feedback_and_wait_for_xmins(0,1);
+
+$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+
+# Trigger the conflict. The txid_current() is to ensure there's some WAL
+# record associated with the database, otherwise the wait below does not work
+# reliably.
+$node_primary->safe_psql('testdb', qq[
+  CREATE ROLE create_trash;
+  DROP ROLE create_trash;
+  SELECT txid_current();
+]);
+$node_primary->safe_psql('testdb', 'VACUUM pg_authid;');
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# message should be issued
+ok( find_in_log(
+   $node_standby,
+  "invalidating obsolete replication slot \"inactiveslot\"", $logstart),
+  'inactiveslot slot invalidation is logged with vacuum on pg_authid');
+
+ok( find_in_log(
+   $node_standby,
+  "invalidating obsolete replication slot \"activeslot\"", $logstart),
+  'activeslot slot invalidation is logged with vacuum on pg_authid');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been updated
+# we now expect 2 conflicts reported as the counter persist across reloads
+ok( $node_standby->poll_query_until(
+	'postgres',
+	"select (confl_active_logicalslot = 3) from pg_stat_database_conflicts where datname = 'testdb'", 't'),
+	'confl_active_logicalslot updated') or die "Timed out waiting confl_active_logicalslot to be updated";
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr);
+
+# We are not able to read from the slot as it has been invalidated
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication slot \"activeslot\"");
+
+##################################################
+# Recovery conflict: Same as Scenario 2 but on a non catalog table
+# Scenario 4: No conflict expected.
+##################################################
+
+# get the position to search from in the standby logfile
+$logstart = -s $node_standby->logfile;
+
+# drop the logical slots
+drop_logical_slots();
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+# put hot standby feedback to off
+change_hot_standby_feedback_and_wait_for_xmins(0,1);
+
+$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+
+# This should not trigger a conflict
+$node_primary->safe_psql('testdb', qq[CREATE TABLE conflict_test(x integer, y text);]);
+$node_primary->safe_psql('testdb', qq[INSERT INTO conflict_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;]);
+$node_primary->safe_psql('testdb', qq[UPDATE conflict_test set x=1, y=1;]);
+$node_primary->safe_psql('testdb', 'VACUUM conflict_test;');
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# message should not be issued
+ok( !find_in_log(
+   $node_standby,
+  "invalidating obsolete slot \"inactiveslot\"", $logstart),
+  'inactiveslot slot invalidation is not logged with vacuum on conflict_test');
+
+ok( !find_in_log(
+   $node_standby,
+  "invalidating obsolete slot \"activeslot\"", $logstart),
+  'activeslot slot invalidation is not logged with vacuum on conflict_test');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has not been updated
+# we now still expect 2 conflicts reported as the counter persist across reloads
+ok( $node_standby->poll_query_until(
+	'postgres',
+	"select (confl_active_logicalslot = 3) from pg_stat_database_conflicts where datname = 'testdb'", 't'),
+	'confl_active_logicalslot not updated') or die "Timed out waiting confl_active_logicalslot to be updated";
+
+# Verify slots are reported as non conflicting in pg_replication_slots
+check_slots_conflicting_status(0);
+
+# Turn hot_standby_feedback back on
+change_hot_standby_feedback_and_wait_for_xmins(1, 0);
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 4: conflict due to on-access pruning.
+##################################################
+
+# get the position to search from in the standby logfile
+$logstart = -s $node_standby->logfile;
+
+# drop the logical slots
+drop_logical_slots();
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+# One way to produce recovery conflict is to trigger an on-access pruning
+# on a relation marked as user_catalog_table.
+change_hot_standby_feedback_and_wait_for_xmins(0,0);
+
+$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+
+# This should trigger the conflict
+$node_primary->safe_psql('testdb', qq[CREATE TABLE prun(id integer, s char(2000)) WITH (fillfactor = 75, user_catalog_table = true);]);
+$node_primary->safe_psql('testdb', qq[INSERT INTO prun VALUES (1, 'A');]);
+$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'B';]);
+$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'C';]);
+$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'D';]);
+$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'E';]);
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# message should be issued
+ok( find_in_log(
+   $node_standby,
+  "invalidating obsolete replication slot \"inactiveslot\"", $logstart),
+  'inactiveslot slot invalidation is logged with on-access pruning');
+
+ok( find_in_log(
+   $node_standby,
+  "invalidating obsolete replication slot \"activeslot\"", $logstart),
+  'activeslot slot invalidation is logged with on-access pruning');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been updated
+# we now expect 3 conflicts reported as the counter persist across reloads
+ok( $node_standby->poll_query_until(
+	'postgres',
+	"select (confl_active_logicalslot = 4) from pg_stat_database_conflicts where datname = 'testdb'", 't'),
+	'confl_active_logicalslot updated') or die "Timed out waiting confl_active_logicalslot to be updated";
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr);
+
+# We are not able to read from the slot as it has been invalidated
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication slot \"activeslot\"");
+
+# Turn hot_standby_feedback back on
+change_hot_standby_feedback_and_wait_for_xmins(1, 1);
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 5: incorrect wal_level on primary.
+##################################################
+
+# get the position to search from in the standby logfile
+$logstart = -s $node_standby->logfile;
+
+# drop the logical slots
+drop_logical_slots();
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+
+# Make primary wal_level replica. This will trigger slot conflict.
+$node_primary->append_conf('postgresql.conf',q[
+wal_level = 'replica'
+]);
+$node_primary->restart;
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# message should be issued
+ok( find_in_log(
+   $node_standby,
+  "invalidating obsolete replication slot \"inactiveslot\"", $logstart),
+  'inactiveslot slot invalidation is logged due to wal_level');
+
+ok( find_in_log(
+   $node_standby,
+  "invalidating obsolete replication slot \"activeslot\"", $logstart),
+  'activeslot slot invalidation is logged due to wal_level');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been updated
+# we now expect 5 conflicts reported as the counter persist across reloads
+ok( $node_standby->poll_query_until(
+	'postgres',
+	"select (confl_active_logicalslot = 5) from pg_stat_database_conflicts where datname = 'testdb'", 't'),
+	'confl_active_logicalslot updated') or die "Timed out waiting confl_active_logicalslot to be updated";
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr);
+# We are not able to read from the slot as it requires wal_level at least logical on the primary server
+check_pg_recvlogical_stderr($handle, "logical decoding on standby requires wal_level to be at least logical on the primary server");
+
+# Restore primary wal_level
+$node_primary->append_conf('postgresql.conf',q[
+wal_level = 'logical'
+]);
+$node_primary->restart;
+$node_primary->wait_for_replay_catchup($node_standby);
+
+$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr);
+# as the slot has been invalidated we should not be able to read
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication slot \"activeslot\"");
+
+##################################################
+# DROP DATABASE should drops it's slots, including active slots.
+##################################################
+
+# drop the logical slots
+drop_logical_slots();
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+# Create a slot on a database that would not be dropped. This slot should not
+# get dropped.
+$node_standby->create_logical_slot_on_standby($node_primary, 'otherslot', 'postgres');
+
+# dropdb on the primary to verify slots are dropped on standby
+$node_primary->safe_psql('postgres', q[DROP DATABASE testdb]);
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+is($node_standby->safe_psql('postgres',
+	q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb')]), 'f',
+	'database dropped on standby');
+
+check_slots_dropped($handle);
+
+is($node_standby->slot('otherslot')->{'slot_type'}, 'logical',
+	'otherslot on standby not dropped');
+
+# Cleanup : manually drop the slot that was not dropped.
+$node_standby->psql('postgres', q[SELECT pg_drop_replication_slot('otherslot')]);
+
+##################################################
+# Test standby promotion and logical decoding behavior
+# after the standby gets promoted.
+##################################################
+
+$node_standby->reload;
+
+$node_primary->psql('postgres', q[CREATE DATABASE testdb]);
+$node_primary->safe_psql('testdb', qq[CREATE TABLE decoding_test(x integer, y text);]);
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+# create the logical slots on the cascading standby too
+create_logical_slots($node_cascading_standby);
+
+# Make slots actives
+$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+my $cascading_handle = make_slot_active($node_cascading_standby, 1, \$cascading_stdout, \$cascading_stderr);
+
+# Insert some rows before the promotion
+$node_primary->safe_psql('testdb',
+	qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;]
+);
+
+# Wait for both standbys to catchup
+$node_primary->wait_for_replay_catchup($node_standby);
+$node_standby->wait_for_replay_catchup($node_cascading_standby, $node_primary);
+
+# promote
+$node_standby->promote;
+
+# insert some rows on promoted standby
+$node_standby->safe_psql('testdb',
+	qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(5,7) s;]
+);
+
+# Wait for the cascading standby to catchup
+$node_standby->wait_for_replay_catchup($node_cascading_standby);
+
+$expected = q{BEGIN
+table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
+table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
+table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
+table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
+COMMIT
+BEGIN
+table public.decoding_test: INSERT: x[integer]:5 y[text]:'5'
+table public.decoding_test: INSERT: x[integer]:6 y[text]:'6'
+table public.decoding_test: INSERT: x[integer]:7 y[text]:'7'
+COMMIT};
+
+# check that we are decoding pre and post promotion inserted rows
+$stdout_sql = $node_standby->safe_psql('testdb',
+	qq[SELECT data FROM pg_logical_slot_peek_changes('inactiveslot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
+);
+
+is($stdout_sql, $expected, 'got expected output from SQL decoding session on promoted standby');
+
+# check that we are decoding pre and post promotion inserted rows
+# with pg_recvlogical that has started before the promotion
+my $pump_timeout = IPC::Run::timer($PostgreSQL::Test::Utils::timeout_default);
+
+ok( pump_until(
+        $handle, $pump_timeout, \$stdout, qr/^.*COMMIT.*COMMIT$/s),
+    'got 2 COMMIT from pg_recvlogical output');
+
+chomp($stdout);
+is($stdout, $expected,
+    'got same expected output from pg_recvlogical decoding session');
+
+# check that we are decoding pre and post promotion inserted rows on the cascading standby
+$stdout_sql = $node_cascading_standby->safe_psql('testdb',
+	qq[SELECT data FROM pg_logical_slot_peek_changes('inactiveslot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
+);
+
+is($stdout_sql, $expected, 'got expected output from SQL decoding session on cascading standby');
+
+# check that we are decoding pre and post promotion inserted rows
+# with pg_recvlogical that has started before the promotion on the cascading standby
+ok( pump_until(
+        $cascading_handle, $pump_timeout, \$cascading_stdout, qr/^.*COMMIT.*COMMIT$/s),
+    'got 2 COMMIT from pg_recvlogical output');
+
+chomp($cascading_stdout);
+is($cascading_stdout, $expected,
+    'got same expected output from pg_recvlogical decoding session on cascading standby');
+
+done_testing();
-- 
2.38.0

>From de57f70bb40af0dfceee6cc19963cb260e80ae63 Mon Sep 17 00:00:00 2001
From: bdrouvotAWS <bdrou...@amazon.com>
Date: Tue, 7 Feb 2023 14:08:11 +0000
Subject: [PATCH va65 9/9] Doc changes describing details about logical
 decoding.

Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes Mello
---
 doc/src/sgml/logicaldecoding.sgml | 27 +++++++++++++++++++++++++++
 1 file changed, 27 insertions(+)

diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 4e912b4bd48..8651024b8a6 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -316,6 +316,33 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
      may consume changes from a slot at any given time.
     </para>
 
+    <para>
+     A logical replication slot can also be created on a hot standby. To prevent
+     <command>VACUUM</command> from removing required rows from the system
+     catalogs, <varname>hot_standby_feedback</varname> should be set on the
+     standby. In spite of that, if any required rows get removed, the slot gets
+     invalidated. It's highly recommended to use a physical slot between the primary
+     and the standby. Otherwise, hot_standby_feedback will work, but only while the
+     connection is alive (for example a node restart would break it). Then, the
+     primary may delete system catalog rows that could be needed by the logical
+     decoding on the standby (as it does not know about the catalog_xmin on the
+     standby). Existing logical slots on standby also get invalidated if wal_level
+     on primary is reduced to less than 'logical'. This is done as soon as the
+     standby detects such a change in the WAL stream. It means, that for walsenders
+     that are lagging (if any), some WAL records up to the wal_level parameter change
+     on the primary won't be decoded.
+    </para>
+
+    <para>
+     For a logical slot to be created, it builds a historic snapshot, for which
+     information of all the currently running transactions is essential. On
+     primary, this information is available, but on standby, this information
+     has to be obtained from primary. So, slot creation may wait for some
+     activity to happen on the primary. If the primary is idle, creating a
+     logical slot on standby may take a noticeable time. One option to speed it
+     is to call the <function>pg_log_standby_snapshot</function> on the primary.
+    </para>
+
     <caution>
      <para>
       Replication slots persist across crashes and know nothing about the state
-- 
2.38.0

Reply via email to