Hi,

On 2023-04-07 08:47:57 -0700, Andres Freund wrote:
> Integrated all of these.

Here's my current version. Changes:
- Integrated Bertrand's changes
- polished commit messages of 0001-0003
- edited code comments for 0003, including
  InvalidateObsoleteReplicationSlots()'s header
- added a bump of SLOT_VERSION to 0001
- moved addition of pg_log_standby_snapshot() to 0007
- added a catversion bump for pg_log_standby_snapshot()
- moved all the bits dealing with procsignals from 0003 to 0004, now the split
  makes sense IMO
- combined a few more sucessive ->safe_psql() calls

I see occasional failures in the tests, particularly in the new test using
pg_authid, but not solely. cfbot also seems to have seen these:
https://cirrus-ci.com/github/postgresql-cfbot/postgresql/commitfest%2F42%2F3740

I made a bogus attempt at a workaround for the pg_authid case last night. But
that didn't actually fix anything, it just changed the timing.

I think the issue is that VACUUM does not force WAL to be flushed at the end
(since it does not assign an xid). wait_for_replay_catchup() uses
$node->lsn('flush'), which, due to VACUUM not flushing, can be an LSN from
before VACUUM completed.

The problem can be made more likely by adding pg_usleep(1000000); before
walwriter.c's call to XLogBackgroundFlush().

We probably should introduce some infrastructure in Cluster.pm for this, but
for now I just added a 'flush_wal' table that we insert into after a
VACUUM. That guarantees a WAL flush.


I think some of the patches might have more reviewers than really applicable,
and might also miss some. I'd appreciate if you could go over that...

Greetings,

Andres Freund
>From 0e038eb5dfddec500fbf4625775d1fa508a208f6 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Thu, 6 Apr 2023 20:00:07 -0700
Subject: [PATCH va67 1/9] Replace a replication slot's invalidated_at LSN with
 an enum

This is mainly useful because the upcoming logical-decoding-on-standby feature
adds further reasons for invalidating slots, and we don't want to end up with
multiple invalidated_* fields, or check different attributes.

Eventually we should consider not resetting restart_lsn when invalidating a
slot due to max_slot_wal_keep_size. But that's a user visible change, so left
for later.

Increases SLOT_VERSION, due to the changed field (with a different alignment,
no less).

Reviewed-by: "Drouvot, Bertrand" <bertranddrouvot...@gmail.com>
Discussion: https://postgr.es/m/20230407075009.igg7be27ha2ht...@awork3.anarazel.de
---
 src/include/replication/slot.h      | 15 +++++++++++++--
 src/backend/replication/slot.c      | 28 ++++++++++++++++++++++++----
 src/backend/replication/slotfuncs.c |  8 +++-----
 src/tools/pgindent/typedefs.list    |  1 +
 4 files changed, 41 insertions(+), 11 deletions(-)

diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 8872c80cdfe..ebcb637baed 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -37,6 +37,17 @@ typedef enum ReplicationSlotPersistency
 	RS_TEMPORARY
 } ReplicationSlotPersistency;
 
+/*
+ * Slots can be invalidated, e.g. due to max_slot_wal_keep_size. If so, the
+ * 'invalidated' field is set to a value other than _NONE.
+ */
+typedef enum ReplicationSlotInvalidationCause
+{
+	RS_INVAL_NONE,
+	/* required WAL has been removed */
+	RS_INVAL_WAL,
+} ReplicationSlotInvalidationCause;
+
 /*
  * On-Disk data of a replication slot, preserved across restarts.
  */
@@ -72,8 +83,8 @@ typedef struct ReplicationSlotPersistentData
 	/* oldest LSN that might be required by this replication slot */
 	XLogRecPtr	restart_lsn;
 
-	/* restart_lsn is copied here when the slot is invalidated */
-	XLogRecPtr	invalidated_at;
+	/* RS_INVAL_NONE if valid, or the reason for having been invalidated */
+	ReplicationSlotInvalidationCause invalidated;
 
 	/*
 	 * Oldest LSN that the client has acked receipt for.  This is used as the
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 2293c0c6fc3..df23b7ed31e 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -89,7 +89,7 @@ typedef struct ReplicationSlotOnDisk
 	sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
 
 #define SLOT_MAGIC		0x1051CA1	/* format identifier */
-#define SLOT_VERSION	2		/* version for new files */
+#define SLOT_VERSION	3		/* version for new files */
 
 /* Control array for replication slot management */
 ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
@@ -855,8 +855,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
 		SpinLockAcquire(&s->mutex);
 		effective_xmin = s->effective_xmin;
 		effective_catalog_xmin = s->effective_catalog_xmin;
-		invalidated = (!XLogRecPtrIsInvalid(s->data.invalidated_at) &&
-					   XLogRecPtrIsInvalid(s->data.restart_lsn));
+		invalidated = s->data.invalidated != RS_INVAL_NONE;
 		SpinLockRelease(&s->mutex);
 
 		/* invalidated slots need not apply */
@@ -901,14 +900,20 @@ ReplicationSlotsComputeRequiredLSN(void)
 	{
 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
 		XLogRecPtr	restart_lsn;
+		bool		invalidated;
 
 		if (!s->in_use)
 			continue;
 
 		SpinLockAcquire(&s->mutex);
 		restart_lsn = s->data.restart_lsn;
+		invalidated = s->data.invalidated != RS_INVAL_NONE;
 		SpinLockRelease(&s->mutex);
 
+		/* invalidated slots need not apply */
+		if (invalidated)
+			continue;
+
 		if (restart_lsn != InvalidXLogRecPtr &&
 			(min_required == InvalidXLogRecPtr ||
 			 restart_lsn < min_required))
@@ -946,6 +951,7 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
 	{
 		ReplicationSlot *s;
 		XLogRecPtr	restart_lsn;
+		bool		invalidated;
 
 		s = &ReplicationSlotCtl->replication_slots[i];
 
@@ -960,8 +966,13 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
 		/* read once, it's ok if it increases while we're checking */
 		SpinLockAcquire(&s->mutex);
 		restart_lsn = s->data.restart_lsn;
+		invalidated = s->data.invalidated != RS_INVAL_NONE;
 		SpinLockRelease(&s->mutex);
 
+		/* invalidated slots need not apply */
+		if (invalidated)
+			continue;
+
 		if (restart_lsn == InvalidXLogRecPtr)
 			continue;
 
@@ -1012,6 +1023,8 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
 		if (s->data.database != dboid)
 			continue;
 
+		/* NB: intentionally counting invalidated slots */
+
 		/* count slots with spinlock held */
 		SpinLockAcquire(&s->mutex);
 		(*nslots)++;
@@ -1069,6 +1082,8 @@ restart:
 		if (s->data.database != dboid)
 			continue;
 
+		/* NB: intentionally including invalidated slots */
+
 		/* acquire slot, so ReplicationSlotDropAcquired can be reused  */
 		SpinLockAcquire(&s->mutex);
 		/* can't change while ReplicationSlotControlLock is held */
@@ -1294,7 +1309,12 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
 		{
 			MyReplicationSlot = s;
 			s->active_pid = MyProcPid;
-			s->data.invalidated_at = restart_lsn;
+			s->data.invalidated = RS_INVAL_WAL;
+
+			/*
+			 * XXX: We should consider not overwriting restart_lsn and instead
+			 * just rely on .invalidated.
+			 */
 			s->data.restart_lsn = InvalidXLogRecPtr;
 
 			/* Let caller know */
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 2f3c9648241..ad3e72be5ee 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -315,12 +315,10 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 			nulls[i++] = true;
 
 		/*
-		 * If invalidated_at is valid and restart_lsn is invalid, we know for
-		 * certain that the slot has been invalidated.  Otherwise, test
-		 * availability from restart_lsn.
+		 * If the slot has not been invalidated, test availability from
+		 * restart_lsn.
 		 */
-		if (XLogRecPtrIsInvalid(slot_contents.data.restart_lsn) &&
-			!XLogRecPtrIsInvalid(slot_contents.data.invalidated_at))
+		if (slot_contents.data.invalidated != RS_INVAL_NONE)
 			walstate = WALAVAIL_REMOVED;
 		else
 			walstate = GetWALAvailability(slot_contents.data.restart_lsn);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 3219ea5f059..e401b3fe7d7 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2337,6 +2337,7 @@ ReplicaIdentityStmt
 ReplicationKind
 ReplicationSlot
 ReplicationSlotCtlData
+ReplicationSlotInvalidationCause
 ReplicationSlotOnDisk
 ReplicationSlotPersistency
 ReplicationSlotPersistentData
-- 
2.38.0

>From 103f493b26f7af3559f1583fe199623ccc7952f7 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Thu, 6 Apr 2023 19:51:06 -0700
Subject: [PATCH va67 2/9] Prevent use of invalidated logical slot in
 CreateDecodingContext()

Previously we had checks for this in multiple places. Support for logical
decoding on standbys will add other forms of invalidation, making it worth
while to centralize the checks.

This slightly changes the error message for both the walsender and SQL
interface. Particularly the SQL interface error was inaccurate, as the "This
slot has never previously reserved WAL" portion was unreachable.

Reviewed-by: "Drouvot, Bertrand" <bertranddrouvot...@gmail.com>
Discussion: https://postgr.es/m/20230407075009.igg7be27ha2ht...@awork3.anarazel.de
---
 src/backend/replication/logical/logical.c      | 16 ++++++++++++++++
 src/backend/replication/logical/logicalfuncs.c | 13 -------------
 src/backend/replication/walsender.c            |  7 -------
 3 files changed, 16 insertions(+), 20 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index c3ec97a0a62..85fc49f655d 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -518,6 +518,22 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 				 errmsg("replication slot \"%s\" was not created in this database",
 						NameStr(slot->data.name))));
 
+	/*
+	 * Check if slot has been invalidated due to max_slot_wal_keep_size. Avoid
+	 * "cannot get changes" wording in this errmsg because that'd be
+	 * confusingly ambiguous about no changes being available when called from
+	 * pg_logical_slot_get_changes_guts().
+	 */
+	if (MyReplicationSlot->data.invalidated == RS_INVAL_WAL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("can no longer get changes from replication slot \"%s\"",
+						NameStr(MyReplicationSlot->data.name)),
+				 errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
+
+	Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE);
+	Assert(MyReplicationSlot->data.restart_lsn != InvalidXLogRecPtr);
+
 	if (start_lsn == InvalidXLogRecPtr)
 	{
 		/* continue from last position */
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index fa1b641a2b0..55a24c02c94 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -214,19 +214,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 									LogicalOutputPrepareWrite,
 									LogicalOutputWrite, NULL);
 
-		/*
-		 * After the sanity checks in CreateDecodingContext, make sure the
-		 * restart_lsn is valid.  Avoid "cannot get changes" wording in this
-		 * errmsg because that'd be confusingly ambiguous about no changes
-		 * being available.
-		 */
-		if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
-			ereport(ERROR,
-					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-					 errmsg("can no longer get changes from replication slot \"%s\"",
-							NameStr(*name)),
-					 errdetail("This slot has never previously reserved WAL, or it has been invalidated.")));
-
 		MemoryContextSwitchTo(oldcontext);
 
 		/*
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 75e8363e248..e40a9b1ba7b 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1253,13 +1253,6 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 
 	ReplicationSlotAcquire(cmd->slotname, true);
 
-	if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
-		ereport(ERROR,
-				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-				 errmsg("cannot read from logical replication slot \"%s\"",
-						cmd->slotname),
-				 errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
-
 	/*
 	 * Force a disconnect, so that the decoding code doesn't need to care
 	 * about an eventual switch from running in recovery, to running in a
-- 
2.38.0

>From 52c25cc15abc4470d19e305d245b9362e6b8d6a3 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Fri, 7 Apr 2023 09:32:48 -0700
Subject: [PATCH va67 3/9] Support invalidating replication slots due to
 horizon and wal_level
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Needed for supporting logical decoding on a standby. The new invalidation
methods will be used in a subsequent commit.

Author: "Drouvot, Bertrand" <bertranddrouvot...@gmail.com>
Author: Andres Freund <and...@anarazel.de>
Author: Amit Khandekar <amitdkhan...@gmail.com> (in an older version)
Reviewed-by: "Drouvot, Bertrand" <bertranddrouvot...@gmail.com>
Reviewed-by: Andres Freund <and...@anarazel.de>
Reviewed-by: Robert Haas <robertmh...@gmail.com>
Reviewed-by: Fabrízio de Royes Mello <fabriziome...@gmail.com>
Reviewed-by: Bharath Rupireddy <bharath.rupireddyforpostg...@gmail.com>
Reviewed-by: Amit Kapila <amit.kapil...@gmail.com>
Discussion: https://postgr.es/m/20230407075009.igg7be27ha2ht...@awork3.anarazel.de
---
 src/include/replication/slot.h            |   9 +-
 src/backend/access/transam/xlog.c         |   6 +-
 src/backend/replication/logical/logical.c |   7 +
 src/backend/replication/slot.c            | 151 ++++++++++++++++++----
 4 files changed, 144 insertions(+), 29 deletions(-)

diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index ebcb637baed..bfc84193a7a 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -46,6 +46,10 @@ typedef enum ReplicationSlotInvalidationCause
 	RS_INVAL_NONE,
 	/* required WAL has been removed */
 	RS_INVAL_WAL,
+	/* required rows have been removed */
+	RS_INVAL_HORIZON,
+	/* wal_level insufficient for slot */
+	RS_INVAL_WAL_LEVEL,
 } ReplicationSlotInvalidationCause;
 
 /*
@@ -226,7 +230,10 @@ extern void ReplicationSlotsComputeRequiredLSN(void);
 extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
-extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
+extern bool InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause,
+											   XLogSegNo oldestSegno,
+											   Oid dboid,
+											   TransactionId snapshotConflictHorizon);
 extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
 extern int	ReplicationSlotIndex(ReplicationSlot *slot);
 extern bool ReplicationSlotName(int index, Name name);
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 46821ad6056..1485e8f9ca9 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -6806,7 +6806,8 @@ CreateCheckPoint(int flags)
 	 */
 	XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
 	KeepLogSeg(recptr, &_logSegNo);
-	if (InvalidateObsoleteReplicationSlots(_logSegNo))
+	if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL, _logSegNo, InvalidOid,
+										   InvalidTransactionId))
 	{
 		/*
 		 * Some slots have been invalidated; recalculate the old-segment
@@ -7250,7 +7251,8 @@ CreateRestartPoint(int flags)
 	replayPtr = GetXLogReplayRecPtr(&replayTLI);
 	endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
 	KeepLogSeg(endptr, &_logSegNo);
-	if (InvalidateObsoleteReplicationSlots(_logSegNo))
+	if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL, _logSegNo, InvalidOid,
+										   InvalidTransactionId))
 	{
 		/*
 		 * Some slots have been invalidated; recalculate the old-segment
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 85fc49f655d..27addd58f66 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -531,6 +531,13 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 						NameStr(MyReplicationSlot->data.name)),
 				 errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
 
+	if (MyReplicationSlot->data.invalidated != RS_INVAL_NONE)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("cannot read from logical replication slot \"%s\"",
+						NameStr(MyReplicationSlot->data.name)),
+				 errdetail("This slot has been invalidated because it was conflicting with recovery.")));
+
 	Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE);
 	Assert(MyReplicationSlot->data.restart_lsn != InvalidXLogRecPtr);
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index df23b7ed31e..c2a9accebf6 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1241,8 +1241,58 @@ ReplicationSlotReserveWal(void)
 }
 
 /*
- * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
- * and mark it invalid, if necessary and possible.
+ * Report that replication slot needs to be invalidated
+ */
+static void
+ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
+					   bool terminating,
+					   int pid,
+					   NameData slotname,
+					   XLogRecPtr restart_lsn,
+					   XLogRecPtr oldestLSN,
+					   TransactionId snapshotConflictHorizon)
+{
+	StringInfoData err_detail;
+	bool		hint = false;
+
+	initStringInfo(&err_detail);
+
+	switch (cause)
+	{
+		case RS_INVAL_WAL:
+			hint = true;
+			appendStringInfo(&err_detail, _("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes."),
+							 LSN_FORMAT_ARGS(restart_lsn),
+							 (unsigned long long) (oldestLSN - restart_lsn));
+			break;
+		case RS_INVAL_HORIZON:
+			appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
+							 snapshotConflictHorizon);
+			break;
+
+		case RS_INVAL_WAL_LEVEL:
+			appendStringInfo(&err_detail, _("Logical decoding on standby requires wal_level to be at least logical on the primary server"));
+			break;
+		case RS_INVAL_NONE:
+			pg_unreachable();
+	}
+
+	ereport(LOG,
+			terminating ?
+			errmsg("terminating process %d to release replication slot \"%s\"",
+				   pid, NameStr(slotname)) :
+			errmsg("invalidating obsolete replication slot \"%s\"",
+				   NameStr(slotname)),
+			errdetail_internal("%s", err_detail.data),
+			hint ? errhint("You might need to increase max_slot_wal_keep_size.") : 0);
+
+	pfree(err_detail.data);
+}
+
+/*
+ * Helper for InvalidateObsoleteReplicationSlots
+ *
+ * Acquires the given slot and mark it invalid, if necessary and possible.
  *
  * Returns whether ReplicationSlotControlLock was released in the interim (and
  * in that case we're not holding the lock at return, otherwise we are).
@@ -1253,7 +1303,10 @@ ReplicationSlotReserveWal(void)
  * for syscalls, so caller must restart if we return true.
  */
 static bool
-InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
+InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
+							   ReplicationSlot *s,
+							   XLogRecPtr oldestLSN,
+							   Oid dboid, TransactionId snapshotConflictHorizon,
 							   bool *invalidated)
 {
 	int			last_signaled_pid = 0;
@@ -1264,6 +1317,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
 		XLogRecPtr	restart_lsn;
 		NameData	slotname;
 		int			active_pid = 0;
+		ReplicationSlotInvalidationCause conflict = RS_INVAL_NONE;
 
 		Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
 
@@ -1286,10 +1340,45 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
 		restart_lsn = s->data.restart_lsn;
 
 		/*
-		 * If the slot is already invalid or is fresh enough, we don't need to
-		 * do anything.
+		 * If the slot is already invalid or is a non conflicting slot, we
+		 * don't need to do anything.
 		 */
-		if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
+		if (s->data.invalidated == RS_INVAL_NONE)
+		{
+			switch (cause)
+			{
+				case RS_INVAL_WAL:
+					if (s->data.restart_lsn != InvalidXLogRecPtr &&
+						s->data.restart_lsn < oldestLSN)
+						conflict = cause;
+					break;
+				case RS_INVAL_HORIZON:
+					if (!SlotIsLogical(s))
+						break;
+					/* invalid DB oid signals a shared relation */
+					if (dboid != InvalidOid && dboid != s->data.database)
+						break;
+					if (TransactionIdIsValid(s->effective_xmin) &&
+						TransactionIdPrecedesOrEquals(s->effective_xmin,
+													  snapshotConflictHorizon))
+						conflict = cause;
+					else if (TransactionIdIsValid(s->effective_catalog_xmin) &&
+							 TransactionIdPrecedesOrEquals(s->effective_catalog_xmin,
+														   snapshotConflictHorizon))
+						conflict = cause;
+					break;
+				case RS_INVAL_WAL_LEVEL:
+					if (SlotIsLogical(s))
+						conflict = cause;
+					break;
+				default:
+					pg_unreachable();
+					break;
+			}
+		}
+
+		/* if there's no conflict, we're done */
+		if (conflict == RS_INVAL_NONE)
 		{
 			SpinLockRelease(&s->mutex);
 			if (released_lock)
@@ -1309,13 +1398,14 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
 		{
 			MyReplicationSlot = s;
 			s->active_pid = MyProcPid;
-			s->data.invalidated = RS_INVAL_WAL;
+			s->data.invalidated = conflict;
 
 			/*
 			 * XXX: We should consider not overwriting restart_lsn and instead
 			 * just rely on .invalidated.
 			 */
-			s->data.restart_lsn = InvalidXLogRecPtr;
+			if (conflict == RS_INVAL_WAL)
+				s->data.restart_lsn = InvalidXLogRecPtr;
 
 			/* Let caller know */
 			*invalidated = true;
@@ -1349,13 +1439,9 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
 			 */
 			if (last_signaled_pid != active_pid)
 			{
-				ereport(LOG,
-						errmsg("terminating process %d to release replication slot \"%s\"",
-							   active_pid, NameStr(slotname)),
-						errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
-								  LSN_FORMAT_ARGS(restart_lsn),
-								  (unsigned long long) (oldestLSN - restart_lsn)),
-						errhint("You might need to increase max_slot_wal_keep_size."));
+				ReportSlotInvalidation(conflict, true, active_pid,
+									   slotname, restart_lsn,
+									   oldestLSN, snapshotConflictHorizon);
 
 				(void) kill(active_pid, SIGTERM);
 				last_signaled_pid = active_pid;
@@ -1390,14 +1476,11 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
 			ReplicationSlotMarkDirty();
 			ReplicationSlotSave();
 			ReplicationSlotRelease();
+			pgstat_drop_replslot(s);
 
-			ereport(LOG,
-					errmsg("invalidating obsolete replication slot \"%s\"",
-						   NameStr(slotname)),
-					errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
-							  LSN_FORMAT_ARGS(restart_lsn),
-							  (unsigned long long) (oldestLSN - restart_lsn)),
-					errhint("You might need to increase max_slot_wal_keep_size."));
+			ReportSlotInvalidation(conflict, false, active_pid,
+								   slotname, restart_lsn,
+								   oldestLSN, snapshotConflictHorizon);
 
 			/* done with this slot for now */
 			break;
@@ -1410,19 +1493,33 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
 }
 
 /*
- * Mark any slot that points to an LSN older than the given segment
- * as invalid; it requires WAL that's about to be removed.
+ * Invalidate slots that require resources about to be removed.
  *
  * Returns true when any slot have got invalidated.
  *
+ * Whether a slot needs to be invalidated depends on the cause. A slot is
+ * removed if it:
+ * - RS_INVAL_WAL: requires a LSN older than the given segment
+ * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon, in the given db
+     dboid may be InvalidOid for shared relations
+ * - RS_INVAL_WAL_LEVEL: is logical
+ *
  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
  */
 bool
-InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
+InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause,
+								   XLogSegNo oldestSegno, Oid dboid,
+								   TransactionId snapshotConflictHorizon)
 {
 	XLogRecPtr	oldestLSN;
 	bool		invalidated = false;
 
+	Assert(cause != RS_INVAL_HORIZON || TransactionIdIsValid(snapshotConflictHorizon));
+	Assert(cause != RS_INVAL_WAL || oldestSegno > 0);
+
+	if (max_replication_slots == 0)
+		return invalidated;
+
 	XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
 
 restart:
@@ -1434,7 +1531,9 @@ restart:
 		if (!s->in_use)
 			continue;
 
-		if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated))
+		if (InvalidatePossiblyObsoleteSlot(cause, s, oldestLSN, dboid,
+										   snapshotConflictHorizon,
+										   &invalidated))
 		{
 			/* if the lock was released, start from scratch */
 			goto restart;
-- 
2.38.0

>From 311a1d8f9c2d1acf0c22e091d53f7a533073c8b7 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Fri, 7 Apr 2023 09:56:02 -0700
Subject: [PATCH va67 4/9] Handle logical slot conflicts on standby
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

During WAL replay on standby, when slot conflict is identified, invalidate
such slots. Also do the same thing if wal_level on the primary server is
reduced to below logical and there are existing logical slots on
standby. Introduce a new ProcSignalReason value for slot conflict recovery.

Author: "Drouvot, Bertrand" <bertranddrouvot...@gmail.com>
Author: Andres Freund <and...@anarazel.de>
Author: Amit Khandekar <amitdkhan...@gmail.com> (in an older version)
Reviewed-by: "Drouvot, Bertrand" <bertranddrouvot...@gmail.com>
Reviewed-by: Andres Freund <and...@anarazel.de>
Reviewed-by: Robert Haas <robertmh...@gmail.com>
Reviewed-by: Fabrízio de Royes Mello <fabriziome...@gmail.com>
Reviewed-by: Bharath Rupireddy <bharath.rupireddyforpostg...@gmail.com>
Reviewed-by: Amit Kapila <amit.kapil...@gmail.com>
Reviewed-by: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Discussion: https://postgr.es/m/20230407075009.igg7be27ha2ht...@awork3.anarazel.de
---
 src/include/storage/procsignal.h     |  1 +
 src/include/storage/standby.h        |  2 ++
 src/backend/access/gist/gistxlog.c   |  2 ++
 src/backend/access/hash/hash_xlog.c  |  1 +
 src/backend/access/heap/heapam.c     |  3 +++
 src/backend/access/nbtree/nbtxlog.c  |  2 ++
 src/backend/access/spgist/spgxlog.c  |  1 +
 src/backend/access/transam/xlog.c    | 15 +++++++++++++++
 src/backend/replication/slot.c       |  8 +++++++-
 src/backend/storage/ipc/procsignal.c |  3 +++
 src/backend/storage/ipc/standby.c    | 20 +++++++++++++++++++-
 src/backend/tcop/postgres.c          |  9 +++++++++
 12 files changed, 65 insertions(+), 2 deletions(-)

diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index 905af2231ba..2f52100b009 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -42,6 +42,7 @@ typedef enum
 	PROCSIG_RECOVERY_CONFLICT_TABLESPACE,
 	PROCSIG_RECOVERY_CONFLICT_LOCK,
 	PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
+	PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
 	PROCSIG_RECOVERY_CONFLICT_BUFFERPIN,
 	PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
 
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index 2effdea126f..41f4dc372e6 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -30,8 +30,10 @@ extern void InitRecoveryTransactionEnvironment(void);
 extern void ShutdownRecoveryTransactionEnvironment(void);
 
 extern void ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
+												bool isCatalogRel,
 												RelFileLocator locator);
 extern void ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHorizon,
+													   bool isCatalogRel,
 													   RelFileLocator locator);
 extern void ResolveRecoveryConflictWithTablespace(Oid tsid);
 extern void ResolveRecoveryConflictWithDatabase(Oid dbid);
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
index b7678f3c144..9a86fb3feff 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -197,6 +197,7 @@ gistRedoDeleteRecord(XLogReaderState *record)
 		XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
 
 		ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
+											xldata->isCatalogRel,
 											rlocator);
 	}
 
@@ -390,6 +391,7 @@ gistRedoPageReuse(XLogReaderState *record)
 	 */
 	if (InHotStandby)
 		ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon,
+												   xlrec->isCatalogRel,
 												   xlrec->locator);
 }
 
diff --git a/src/backend/access/hash/hash_xlog.c b/src/backend/access/hash/hash_xlog.c
index f2dd9be8d3f..e8e06c62a95 100644
--- a/src/backend/access/hash/hash_xlog.c
+++ b/src/backend/access/hash/hash_xlog.c
@@ -1003,6 +1003,7 @@ hash_xlog_vacuum_one_page(XLogReaderState *record)
 
 		XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
 		ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
+											xldata->isCatalogRel,
 											rlocator);
 	}
 
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 8b13e3f8925..f389ceee1ea 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -8769,6 +8769,7 @@ heap_xlog_prune(XLogReaderState *record)
 	 */
 	if (InHotStandby)
 		ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+											xlrec->isCatalogRel,
 											rlocator);
 
 	/*
@@ -8940,6 +8941,7 @@ heap_xlog_visible(XLogReaderState *record)
 	 */
 	if (InHotStandby)
 		ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+											xlrec->flags & VISIBILITYMAP_XLOG_CATALOG_REL,
 											rlocator);
 
 	/*
@@ -9061,6 +9063,7 @@ heap_xlog_freeze_page(XLogReaderState *record)
 
 		XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
 		ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+											xlrec->isCatalogRel,
 											rlocator);
 	}
 
diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c
index 414ca4f6deb..c87e46ed66e 100644
--- a/src/backend/access/nbtree/nbtxlog.c
+++ b/src/backend/access/nbtree/nbtxlog.c
@@ -669,6 +669,7 @@ btree_xlog_delete(XLogReaderState *record)
 		XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
 
 		ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+											xlrec->isCatalogRel,
 											rlocator);
 	}
 
@@ -1007,6 +1008,7 @@ btree_xlog_reuse_page(XLogReaderState *record)
 
 	if (InHotStandby)
 		ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon,
+												   xlrec->isCatalogRel,
 												   xlrec->locator);
 }
 
diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c
index b071b59c8ac..459ac929ba5 100644
--- a/src/backend/access/spgist/spgxlog.c
+++ b/src/backend/access/spgist/spgxlog.c
@@ -879,6 +879,7 @@ spgRedoVacuumRedirect(XLogReaderState *record)
 
 		XLogRecGetBlockTag(record, 0, &locator, NULL, NULL);
 		ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
+											xldata->isCatalogRel,
 											locator);
 	}
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 1485e8f9ca9..5227fc675c8 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7965,6 +7965,21 @@ xlog_redo(XLogReaderState *record)
 		/* Update our copy of the parameters in pg_control */
 		memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change));
 
+		/*
+		 * Invalidate logical slots if we are in hot standby and the primary
+		 * does not have a WAL level sufficient for logical decoding. No need
+		 * to search for potentially conflicting logically slots if standby is
+		 * running with wal_level lower than logical, because in that case, we
+		 * would have either disallowed creation of logical slots or
+		 * invalidated existing ones.
+		 */
+		if (InRecovery && InHotStandby &&
+			xlrec.wal_level < WAL_LEVEL_LOGICAL &&
+			wal_level >= WAL_LEVEL_LOGICAL)
+			InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_LEVEL,
+											   0, InvalidOid,
+											   InvalidTransactionId);
+
 		LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
 		ControlFile->MaxConnections = xlrec.MaxConnections;
 		ControlFile->max_worker_processes = xlrec.max_worker_processes;
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index c2a9accebf6..1b1b51e21ed 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1443,7 +1443,13 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 									   slotname, restart_lsn,
 									   oldestLSN, snapshotConflictHorizon);
 
-				(void) kill(active_pid, SIGTERM);
+				if (MyBackendType == B_STARTUP)
+					(void) SendProcSignal(active_pid,
+										  PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
+										  InvalidBackendId);
+				else
+					(void) kill(active_pid, SIGTERM);
+
 				last_signaled_pid = active_pid;
 			}
 
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index 395b2cf6909..c85cb5cc18d 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -673,6 +673,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
 	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT))
 		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT);
 
+	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT))
+		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT);
+
 	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK))
 		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
 
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 9f56b4e95cf..3b5d654347e 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -24,6 +24,7 @@
 #include "access/xlogutils.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/slot.h"
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
 #include "storage/proc.h"
@@ -466,6 +467,7 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
  */
 void
 ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
+									bool isCatalogRel,
 									RelFileLocator locator)
 {
 	VirtualTransactionId *backends;
@@ -491,6 +493,16 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
 										   PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
 										   WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT,
 										   true);
+
+	/*
+	 * Note that WaitExceedsMaxStandbyDelay() is not taken into account here
+	 * (as opposed to ResolveRecoveryConflictWithVirtualXIDs() above). That
+	 * seems OK, given that this kind of conflict should not normally be
+	 * reached, e.g. by using a physical replication slot.
+	 */
+	if (wal_level >= WAL_LEVEL_LOGICAL && isCatalogRel)
+		InvalidateObsoleteReplicationSlots(RS_INVAL_HORIZON, 0, locator.dbOid,
+										   snapshotConflictHorizon);
 }
 
 /*
@@ -499,6 +511,7 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
  */
 void
 ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHorizon,
+										   bool isCatalogRel,
 										   RelFileLocator locator)
 {
 	/*
@@ -517,7 +530,9 @@ ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHor
 		TransactionId truncated;
 
 		truncated = XidFromFullTransactionId(snapshotConflictHorizon);
-		ResolveRecoveryConflictWithSnapshot(truncated, locator);
+		ResolveRecoveryConflictWithSnapshot(truncated,
+											isCatalogRel,
+											locator);
 	}
 }
 
@@ -1478,6 +1493,9 @@ get_recovery_conflict_desc(ProcSignalReason reason)
 		case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
 			reasonDesc = _("recovery conflict on snapshot");
 			break;
+		case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+			reasonDesc = _("recovery conflict on replication slot");
+			break;
 		case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
 			reasonDesc = _("recovery conflict on buffer deadlock");
 			break;
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index a10ecbaf50b..25e0de4e0ff 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -2526,6 +2526,9 @@ errdetail_recovery_conflict(void)
 		case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
 			errdetail("User query might have needed to see row versions that must be removed.");
 			break;
+		case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+			errdetail("User was using the logical slot that must be dropped.");
+			break;
 		case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
 			errdetail("User transaction caused buffer deadlock with recovery.");
 			break;
@@ -3143,6 +3146,12 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
 				InterruptPending = true;
 				break;
 
+			case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+				RecoveryConflictPending = true;
+				QueryCancelPending = true;
+				InterruptPending = true;
+				break;
+
 			default:
 				elog(FATAL, "unrecognized conflict mode: %d",
 					 (int) reason);
-- 
2.38.0

>From c2e78e0ae42aae550a07d45da07461b4c8a26684 Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot...@gmail.com>
Date: Mon, 3 Apr 2023 11:28:30 +0000
Subject: [PATCH va67 5/9] Arrange for a new pg_stat_database_conflicts and
 pg_replication_slots field

As we handled logical slot conflicts on standby on the previous commit, we
can expose the conflict in pg_stat_database_conflicts and pg_replication_slots.

Adding:

- confl_active_logicalslot in pg_stat_database_conflicts
- conflicting in pg_replication_slots

to do so.
---
 src/include/catalog/pg_proc.dat              | 11 ++++++++---
 src/include/pgstat.h                         |  1 +
 src/backend/catalog/system_views.sql         |  6 ++++--
 src/backend/replication/slotfuncs.c          | 12 +++++++++++-
 src/backend/utils/activity/pgstat_database.c |  4 ++++
 src/backend/utils/adt/pgstatfuncs.c          |  3 +++
 doc/src/sgml/monitoring.sgml                 | 11 +++++++++++
 doc/src/sgml/system-views.sgml               | 10 ++++++++++
 src/test/regress/expected/rules.out          |  8 +++++---
 9 files changed, 57 insertions(+), 9 deletions(-)

diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index f64bc68276a..016354d75c5 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5611,6 +5611,11 @@
   proname => 'pg_stat_get_db_conflict_snapshot', provolatile => 's',
   proparallel => 'r', prorettype => 'int8', proargtypes => 'oid',
   prosrc => 'pg_stat_get_db_conflict_snapshot' },
+{ oid => '9901',
+  descr => 'statistics: recovery conflicts in database caused by logical replication slot',
+  proname => 'pg_stat_get_db_conflict_logicalslot', provolatile => 's',
+  proparallel => 'r', prorettype => 'int8', proargtypes => 'oid',
+  prosrc => 'pg_stat_get_db_conflict_logicalslot' },
 { oid => '3068',
   descr => 'statistics: recovery conflicts in database caused by shared buffer pin',
   proname => 'pg_stat_get_db_conflict_bufferpin', provolatile => 's',
@@ -11077,9 +11082,9 @@
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase}',
+  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index e79b8a34ebc..5e8b04d21b1 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -331,6 +331,7 @@ typedef struct PgStat_StatDBEntry
 	PgStat_Counter conflict_tablespace;
 	PgStat_Counter conflict_lock;
 	PgStat_Counter conflict_snapshot;
+	PgStat_Counter conflict_logicalslot;
 	PgStat_Counter conflict_bufferpin;
 	PgStat_Counter conflict_startup_deadlock;
 	PgStat_Counter temp_files;
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 6b098234f8c..c25067d06de 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -999,7 +999,8 @@ CREATE VIEW pg_replication_slots AS
             L.confirmed_flush_lsn,
             L.wal_status,
             L.safe_wal_size,
-            L.two_phase
+            L.two_phase,
+            L.conflicting
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
@@ -1067,7 +1068,8 @@ CREATE VIEW pg_stat_database_conflicts AS
             pg_stat_get_db_conflict_lock(D.oid) AS confl_lock,
             pg_stat_get_db_conflict_snapshot(D.oid) AS confl_snapshot,
             pg_stat_get_db_conflict_bufferpin(D.oid) AS confl_bufferpin,
-            pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock
+            pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock,
+            pg_stat_get_db_conflict_logicalslot(D.oid) AS confl_active_logicalslot
     FROM pg_database D;
 
 CREATE VIEW pg_stat_user_functions AS
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index ad3e72be5ee..6035cf48160 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -232,7 +232,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 14
+#define PG_GET_REPLICATION_SLOTS_COLS 15
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	XLogRecPtr	currlsn;
 	int			slotno;
@@ -402,6 +402,16 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 
 		values[i++] = BoolGetDatum(slot_contents.data.two_phase);
 
+		if (slot_contents.data.database == InvalidOid)
+			nulls[i++] = true;
+		else
+		{
+			if (slot_contents.data.invalidated != RS_INVAL_NONE)
+				values[i++] = BoolGetDatum(true);
+			else
+				values[i++] = BoolGetDatum(false);
+		}
+
 		Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
 
 		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
diff --git a/src/backend/utils/activity/pgstat_database.c b/src/backend/utils/activity/pgstat_database.c
index 6e650ceaade..7149f22f729 100644
--- a/src/backend/utils/activity/pgstat_database.c
+++ b/src/backend/utils/activity/pgstat_database.c
@@ -109,6 +109,9 @@ pgstat_report_recovery_conflict(int reason)
 		case PROCSIG_RECOVERY_CONFLICT_BUFFERPIN:
 			dbentry->conflict_bufferpin++;
 			break;
+		case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+			dbentry->conflict_logicalslot++;
+			break;
 		case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
 			dbentry->conflict_startup_deadlock++;
 			break;
@@ -387,6 +390,7 @@ pgstat_database_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
 	PGSTAT_ACCUM_DBCOUNT(conflict_tablespace);
 	PGSTAT_ACCUM_DBCOUNT(conflict_lock);
 	PGSTAT_ACCUM_DBCOUNT(conflict_snapshot);
+	PGSTAT_ACCUM_DBCOUNT(conflict_logicalslot);
 	PGSTAT_ACCUM_DBCOUNT(conflict_bufferpin);
 	PGSTAT_ACCUM_DBCOUNT(conflict_startup_deadlock);
 
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index eec9f3cf9b1..4de60d8aa14 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -1066,6 +1066,8 @@ PG_STAT_GET_DBENTRY_INT64(xact_commit)
 /* pg_stat_get_db_xact_rollback */
 PG_STAT_GET_DBENTRY_INT64(xact_rollback)
 
+/* pg_stat_get_db_conflict_logicalslot */
+PG_STAT_GET_DBENTRY_INT64(conflict_logicalslot)
 
 Datum
 pg_stat_get_db_stat_reset_time(PG_FUNCTION_ARGS)
@@ -1099,6 +1101,7 @@ pg_stat_get_db_conflict_all(PG_FUNCTION_ARGS)
 		result = (int64) (dbentry->conflict_tablespace +
 						  dbentry->conflict_lock +
 						  dbentry->conflict_snapshot +
+						  dbentry->conflict_logicalslot +
 						  dbentry->conflict_bufferpin +
 						  dbentry->conflict_startup_deadlock);
 
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index bce9ae46615..fa3b0f810cd 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -4674,6 +4674,17 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
        deadlocks
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_active_logicalslot</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of active logical slots in this database that have been
+       invalidated because they conflict with recovery (note that inactive ones
+       are also invalidated but do not increment this counter)
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index bb1a4184508..57b228076e8 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -2517,6 +2517,16 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
        false for physical slots.
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>conflicting</structfield> <type>bool</type>
+      </para>
+      <para>
+       True if this logical slot conflicted with recovery (and so is now
+       invalidated). Always NULL for physical slots.
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index ab1aebfde42..06d3f1f5d34 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1472,8 +1472,9 @@ pg_replication_slots| SELECT l.slot_name,
     l.confirmed_flush_lsn,
     l.wal_status,
     l.safe_wal_size,
-    l.two_phase
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase)
+    l.two_phase,
+    l.conflicting
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
@@ -1869,7 +1870,8 @@ pg_stat_database_conflicts| SELECT oid AS datid,
     pg_stat_get_db_conflict_lock(oid) AS confl_lock,
     pg_stat_get_db_conflict_snapshot(oid) AS confl_snapshot,
     pg_stat_get_db_conflict_bufferpin(oid) AS confl_bufferpin,
-    pg_stat_get_db_conflict_startup_deadlock(oid) AS confl_deadlock
+    pg_stat_get_db_conflict_startup_deadlock(oid) AS confl_deadlock,
+    pg_stat_get_db_conflict_logicalslot(oid) AS confl_active_logicalslot
    FROM pg_database d;
 pg_stat_gssapi| SELECT pid,
     gss_auth AS gss_authenticated,
-- 
2.38.0

>From ae5a0271e17471f16566757bf298fec8c8a63432 Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot...@gmail.com>
Date: Mon, 3 Apr 2023 16:46:09 +0000
Subject: [PATCH va67 6/9] For cascading replication, wake physical and logical
 walsenders separately

Physical walsenders can't send data until it's been flushed; logical
walsenders can't decode and send data until it's been applied. On the
standby, the WAL is flushed first, which will only wake up physical
walsenders; and then applied, which will only wake up logical
walsenders.

Previously, all walsenders were awakened when the WAL was flushed. That
was fine for logical walsenders on the primary; but on the standby the
flushed WAL would have been not applied yet, so logical walsenders were
awakened too early.

Per idea from Jeff Davis and Amit Kapila.

Author: "Drouvot, Bertrand" <bertranddrouvot...@gmail.com>
Reviewed-By: Jeff Davis <pg...@j-davis.com>
Reviewed-By: Robert Haas <robertmh...@gmail.com>
---
 src/include/replication/walsender.h         | 22 +++++------
 src/include/replication/walsender_private.h |  3 ++
 src/backend/access/transam/xlog.c           |  6 +--
 src/backend/access/transam/xlogarchive.c    |  2 +-
 src/backend/access/transam/xlogrecovery.c   | 30 +++++++++++---
 src/backend/replication/walreceiver.c       |  2 +-
 src/backend/replication/walsender.c         | 43 +++++++++++++++++----
 7 files changed, 79 insertions(+), 29 deletions(-)

diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index 52bb3e2aae3..9df7e50f943 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -42,7 +42,7 @@ extern void WalSndResourceCleanup(bool isCommit);
 extern void WalSndSignals(void);
 extern Size WalSndShmemSize(void);
 extern void WalSndShmemInit(void);
-extern void WalSndWakeup(void);
+extern void WalSndWakeup(bool physical, bool logical);
 extern void WalSndInitStopping(void);
 extern void WalSndWaitStopping(void);
 extern void HandleWalSndInitStopping(void);
@@ -60,15 +60,15 @@ extern void WalSndRqstFileReload(void);
 /*
  * wakeup walsenders if there is work to be done
  */
-#define WalSndWakeupProcessRequests()		\
-	do										\
-	{										\
-		if (wake_wal_senders)				\
-		{									\
-			wake_wal_senders = false;		\
-			if (max_wal_senders > 0)		\
-				WalSndWakeup();				\
-		}									\
-	} while (0)
+static inline void
+WalSndWakeupProcessRequests(bool physical, bool logical)
+{
+	if (wake_wal_senders)
+	{
+		wake_wal_senders = false;
+		if (max_wal_senders > 0)
+			WalSndWakeup(physical, logical);
+	}
+}
 
 #endif							/* _WALSENDER_H */
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 5310e054c48..ff25aa70a89 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -15,6 +15,7 @@
 #include "access/xlog.h"
 #include "lib/ilist.h"
 #include "nodes/nodes.h"
+#include "nodes/replnodes.h"
 #include "replication/syncrep.h"
 #include "storage/latch.h"
 #include "storage/shmem.h"
@@ -79,6 +80,8 @@ typedef struct WalSnd
 	 * Timestamp of the last message received from standby.
 	 */
 	TimestampTz replyTime;
+
+	ReplicationKind kind;
 } WalSnd;
 
 extern PGDLLIMPORT WalSnd *MyWalSnd;
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 5227fc675c8..83ab70879bf 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -2645,7 +2645,7 @@ XLogFlush(XLogRecPtr record)
 	END_CRIT_SECTION();
 
 	/* wake up walsenders now that we've released heavily contended locks */
-	WalSndWakeupProcessRequests();
+	WalSndWakeupProcessRequests(true, !RecoveryInProgress());
 
 	/*
 	 * If we still haven't flushed to the request point then we have a
@@ -2816,7 +2816,7 @@ XLogBackgroundFlush(void)
 	END_CRIT_SECTION();
 
 	/* wake up walsenders now that we've released heavily contended locks */
-	WalSndWakeupProcessRequests();
+	WalSndWakeupProcessRequests(true, !RecoveryInProgress());
 
 	/*
 	 * Great, done. To take some work off the critical path, try to initialize
@@ -5762,7 +5762,7 @@ StartupXLOG(void)
 	 * If there were cascading standby servers connected to us, nudge any wal
 	 * sender processes to notice that we've been promoted.
 	 */
-	WalSndWakeup();
+	WalSndWakeup(true, true);
 
 	/*
 	 * If this was a promotion, request an (online) checkpoint now. This isn't
diff --git a/src/backend/access/transam/xlogarchive.c b/src/backend/access/transam/xlogarchive.c
index a0f5aa24b58..f3fb92c8f96 100644
--- a/src/backend/access/transam/xlogarchive.c
+++ b/src/backend/access/transam/xlogarchive.c
@@ -421,7 +421,7 @@ KeepFileRestoredFromArchive(const char *path, const char *xlogfname)
 	 * if we restored something other than a WAL segment, but it does no harm
 	 * either.
 	 */
-	WalSndWakeup();
+	WalSndWakeup(true, false);
 }
 
 /*
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index dbe93947627..e6427c54c57 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -1935,6 +1935,30 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
 	XLogRecoveryCtl->lastReplayedTLI = *replayTLI;
 	SpinLockRelease(&XLogRecoveryCtl->info_lck);
 
+	/*
+	 * Wakeup walsenders:
+	 *
+	 * On the standby, the WAL is flushed first (which will only wake up
+	 * physical walsenders) and then applied, which will only wake up logical
+	 * walsenders.
+	 *
+	 * Indeed, logical walsenders on standby can't decode and send data until
+	 * it's been applied.
+	 *
+	 * Physical walsenders don't need to be woken up during replay unless
+	 * cascading replication is allowed and time line change occured (so that
+	 * they can notice that they are on a new time line).
+	 *
+	 * That's why the wake up conditions are for:
+	 *
+	 *  - physical walsenders in case of new time line and cascade
+	 *  replication is allowed.
+	 *  - logical walsenders in case cascade replication is allowed (could not
+	 *  be created otherwise).
+	 */
+	if (AllowCascadeReplication())
+		WalSndWakeup(switchedTLI, true);
+
 	/*
 	 * If rm_redo called XLogRequestWalReceiverReply, then we wake up the
 	 * receiver so that it notices the updated lastReplayedEndRecPtr and sends
@@ -1958,12 +1982,6 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
 		 */
 		RemoveNonParentXlogFiles(xlogreader->EndRecPtr, *replayTLI);
 
-		/*
-		 * Wake up any walsenders to notice that we are on a new timeline.
-		 */
-		if (AllowCascadeReplication())
-			WalSndWakeup();
-
 		/* Reset the prefetcher. */
 		XLogPrefetchReconfigure();
 	}
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 685af51d5d3..feff7094351 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1010,7 +1010,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
 		/* Signal the startup process and walsender that new WAL has arrived */
 		WakeupRecovery();
 		if (AllowCascadeReplication())
-			WalSndWakeup();
+			WalSndWakeup(true, false);
 
 		/* Report XLOG streaming progress in PS display */
 		if (update_process_title)
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index e40a9b1ba7b..66493b6e896 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2603,6 +2603,23 @@ InitWalSenderSlot(void)
 			walsnd->sync_standby_priority = 0;
 			walsnd->latch = &MyProc->procLatch;
 			walsnd->replyTime = 0;
+
+			/*
+			 * The kind assignment is done here and not in StartReplication()
+			 * and StartLogicalReplication(). Indeed, the logical walsender
+			 * needs to read WAL records (like snapshot of running
+			 * transactions) during the slot creation. So it needs to be woken
+			 * up based on its kind.
+			 *
+			 * The kind assignment could also be done in StartReplication(),
+			 * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
+			 * seems better to set it on one place.
+			 */
+			if (MyDatabaseId == InvalidOid)
+				walsnd->kind = REPLICATION_KIND_PHYSICAL;
+			else
+				walsnd->kind = REPLICATION_KIND_LOGICAL;
+
 			SpinLockRelease(&walsnd->mutex);
 			/* don't need the lock anymore */
 			MyWalSnd = (WalSnd *) walsnd;
@@ -3280,30 +3297,42 @@ WalSndShmemInit(void)
 }
 
 /*
- * Wake up all walsenders
+ * Wake up physical, logical or both walsenders kind
+ *
+ * The distinction between physical and logical walsenders is done, because:
+ * - physical walsenders can't send data until it's been flushed
+ * - logical walsenders on standby can't decode and send data until it's been
+ * applied
+ *
+ * For cascading replication we need to wake up physical
+ * walsenders separately from logical walsenders (see the comment before calling
+ * WalSndWakeup() in ApplyWalRecord() for more details).
  *
  * This will be called inside critical sections, so throwing an error is not
  * advisable.
  */
 void
-WalSndWakeup(void)
+WalSndWakeup(bool physical, bool logical)
 {
 	int			i;
 
 	for (i = 0; i < max_wal_senders; i++)
 	{
 		Latch	   *latch;
+		ReplicationKind kind;
 		WalSnd	   *walsnd = &WalSndCtl->walsnds[i];
 
-		/*
-		 * Get latch pointer with spinlock held, for the unlikely case that
-		 * pointer reads aren't atomic (as they're 8 bytes).
-		 */
+		/* get latch pointer and kind with spinlock helds */
 		SpinLockAcquire(&walsnd->mutex);
 		latch = walsnd->latch;
+		kind = walsnd->kind;
 		SpinLockRelease(&walsnd->mutex);
 
-		if (latch != NULL)
+		if (latch == NULL)
+			continue;
+
+		if ((physical && kind == REPLICATION_KIND_PHYSICAL) ||
+			(logical && kind == REPLICATION_KIND_LOGICAL))
 			SetLatch(latch);
 	}
 }
-- 
2.38.0

>From 572acd14c15704ddafe26fc09b7288151e5115c7 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Thu, 6 Apr 2023 23:34:22 -0700
Subject: [PATCH va67 7/9] Allow logical decoding on standby

Allow a logical slot to be created on standby. Restrict its usage or its
creation if wal_level on primary is less than logical.  During slot creation,
it's restart_lsn is set to the last replayed LSN. Effectively, a logical slot
creation on standby waits for an xl_running_xact record to arrive from
primary.

This commit also introduces the pg_log_standby_snapshot() function. The idea
is to be able to take a snapshot of running transactions and write this to WAL
without requesting for a (costly) checkpoint. This allows to make it much
faster to create logical slots on a replica.

Bumps catversion, for the addition of the pg_log_standby_snapshot() function.

Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes Mello
---
 src/include/access/xlog.h                 |  1 +
 src/include/catalog/pg_proc.dat           |  3 ++
 src/backend/access/transam/xlog.c         | 11 +++++
 src/backend/access/transam/xlogfuncs.c    | 32 ++++++++++++
 src/backend/catalog/system_functions.sql  |  2 +
 src/backend/replication/logical/decode.c  | 30 +++++++++++-
 src/backend/replication/logical/logical.c | 36 ++++++++------
 src/backend/replication/slot.c            | 59 ++++++++++++-----------
 src/backend/replication/walsender.c       | 48 ++++++++++++------
 doc/src/sgml/func.sgml                    | 15 ++++++
 10 files changed, 176 insertions(+), 61 deletions(-)

diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index cfe5409738c..48ca8523810 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -230,6 +230,7 @@ extern void XLOGShmemInit(void);
 extern void BootStrapXLOG(void);
 extern void InitializeWalConsistencyChecking(void);
 extern void LocalProcessControlFile(bool reset);
+extern WalLevel GetActiveWalLevelOnStandby(void);
 extern void StartupXLOG(void);
 extern void ShutdownXLOG(int code, Datum arg);
 extern void CreateCheckPoint(int flags);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 016354d75c5..55229759ac6 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -6432,6 +6432,9 @@
 { oid => '2848', descr => 'switch to new wal file',
   proname => 'pg_switch_wal', provolatile => 'v', prorettype => 'pg_lsn',
   proargtypes => '', prosrc => 'pg_switch_wal' },
+{ oid => '9658', descr => 'log details of the current snapshot to WAL',
+  proname => 'pg_log_standby_snapshot', provolatile => 'v', prorettype => 'pg_lsn',
+  proargtypes => '', prosrc => 'pg_log_standby_snapshot' },
 { oid => '3098', descr => 'create a named restore point',
   proname => 'pg_create_restore_point', provolatile => 'v',
   prorettype => 'pg_lsn', proargtypes => 'text',
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 83ab70879bf..743a4723a76 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -4466,6 +4466,17 @@ LocalProcessControlFile(bool reset)
 	ReadControlFile();
 }
 
+/*
+ * Get the wal_level from the control file. For a standby, this value should be
+ * considered as its active wal_level, because it may be different from what
+ * was originally configured on standby.
+ */
+WalLevel
+GetActiveWalLevelOnStandby(void)
+{
+	return ControlFile->wal_level;
+}
+
 /*
  * Initialization of shared memory for XLOG
  */
diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c
index c07daa874f9..36a309b54cc 100644
--- a/src/backend/access/transam/xlogfuncs.c
+++ b/src/backend/access/transam/xlogfuncs.c
@@ -31,6 +31,7 @@
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/smgr.h"
+#include "storage/standby.h"
 #include "utils/builtins.h"
 #include "utils/guc.h"
 #include "utils/memutils.h"
@@ -196,6 +197,37 @@ pg_switch_wal(PG_FUNCTION_ARGS)
 	PG_RETURN_LSN(switchpoint);
 }
 
+/*
+ * pg_log_standby_snapshot: call LogStandbySnapshot()
+ *
+ * Permission checking for this function is managed through the normal
+ * GRANT system.
+ */
+Datum
+pg_log_standby_snapshot(PG_FUNCTION_ARGS)
+{
+	XLogRecPtr	recptr;
+
+	if (RecoveryInProgress())
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("recovery is in progress"),
+				 errhint("pg_log_standby_snapshot() cannot be executed during recovery.")));
+
+	if (!XLogStandbyInfoActive())
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("wal_level is not in desired state"),
+				 errhint("wal_level has to be >= WAL_LEVEL_REPLICA.")));
+
+	recptr = LogStandbySnapshot();
+
+	/*
+	 * As a convenience, return the WAL location of the last inserted record
+	 */
+	PG_RETURN_LSN(recptr);
+}
+
 /*
  * pg_create_restore_point: a named point for restore
  *
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 83ca8934440..b7c65ea37d7 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -644,6 +644,8 @@ REVOKE EXECUTE ON FUNCTION pg_create_restore_point(text) FROM public;
 
 REVOKE EXECUTE ON FUNCTION pg_switch_wal() FROM public;
 
+REVOKE EXECUTE ON FUNCTION pg_log_standby_snapshot() FROM public;
+
 REVOKE EXECUTE ON FUNCTION pg_wal_replay_pause() FROM public;
 
 REVOKE EXECUTE ON FUNCTION pg_wal_replay_resume() FROM public;
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 8fe7bb65f1f..8352dbf5df6 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -152,11 +152,39 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			 * can restart from there.
 			 */
 			break;
+		case XLOG_PARAMETER_CHANGE:
+			{
+				xl_parameter_change *xlrec =
+				(xl_parameter_change *) XLogRecGetData(buf->record);
+
+				/*
+				 * If wal_level on primary is reduced to less than logical,
+				 * then we want to prevent existing logical slots from being
+				 * used. Existing logical slots on standby get invalidated
+				 * when this WAL record is replayed; and further, slot
+				 * creation fails when the wal level is not sufficient; but
+				 * all these operations are not synchronized, so a logical
+				 * slot may creep in while the wal_level is being reduced.
+				 * Hence this extra check.
+				 */
+				if (xlrec->wal_level < WAL_LEVEL_LOGICAL)
+				{
+					/*
+					 * This can occur only on a standby, as a primary would
+					 * not allow to restart after changing wal_level < logical
+					 * if there is pre-existing logical slot.
+					 */
+					Assert(RecoveryInProgress());
+					ereport(ERROR,
+							(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+							 errmsg("logical decoding on standby requires wal_level to be at least logical on the primary server")));
+				}
+				break;
+			}
 		case XLOG_NOOP:
 		case XLOG_NEXTOID:
 		case XLOG_SWITCH:
 		case XLOG_BACKUP_END:
-		case XLOG_PARAMETER_CHANGE:
 		case XLOG_RESTORE_POINT:
 		case XLOG_FPW_CHANGE:
 		case XLOG_FPI_FOR_HINT:
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 27addd58f66..659b08cd456 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -124,23 +124,21 @@ CheckLogicalDecodingRequirements(void)
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("logical decoding requires a database connection")));
 
-	/* ----
-	 * TODO: We got to change that someday soon...
-	 *
-	 * There's basically three things missing to allow this:
-	 * 1) We need to be able to correctly and quickly identify the timeline a
-	 *	  LSN belongs to
-	 * 2) We need to force hot_standby_feedback to be enabled at all times so
-	 *	  the primary cannot remove rows we need.
-	 * 3) support dropping replication slots referring to a database, in
-	 *	  dbase_redo. There can't be any active ones due to HS recovery
-	 *	  conflicts, so that should be relatively easy.
-	 * ----
-	 */
 	if (RecoveryInProgress())
-		ereport(ERROR,
-				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				 errmsg("logical decoding cannot be used while in recovery")));
+	{
+		/*
+		 * This check may have race conditions, but whenever
+		 * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we
+		 * verify that there are no existing logical replication slots. And to
+		 * avoid races around creating a new slot,
+		 * CheckLogicalDecodingRequirements() is called once before creating
+		 * the slot, and once when logical decoding is initially starting up.
+		 */
+		if (GetActiveWalLevelOnStandby() < WAL_LEVEL_LOGICAL)
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("logical decoding on standby requires wal_level to be at least logical on the primary server")));
+	}
 }
 
 /*
@@ -342,6 +340,12 @@ CreateInitDecodingContext(const char *plugin,
 	LogicalDecodingContext *ctx;
 	MemoryContext old_context;
 
+	/*
+	 * On standby, this check is also required while creating the slot. Check
+	 * the comments in this function.
+	 */
+	CheckLogicalDecodingRequirements();
+
 	/* shorter lines... */
 	slot = MyReplicationSlot;
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 1b1b51e21ed..513c132f16d 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -41,6 +41,7 @@
 
 #include "access/transam.h"
 #include "access/xlog_internal.h"
+#include "access/xlogrecovery.h"
 #include "common/file_utils.h"
 #include "common/string.h"
 #include "miscadmin.h"
@@ -1192,37 +1193,28 @@ ReplicationSlotReserveWal(void)
 		/*
 		 * For logical slots log a standby snapshot and start logical decoding
 		 * at exactly that position. That allows the slot to start up more
-		 * quickly.
+		 * quickly. But on a standby we cannot do WAL writes, so just use the
+		 * replay pointer; effectively, an attempt to create a logical slot on
+		 * standby will cause it to wait for an xl_running_xact record to be
+		 * logged independently on the primary, so that a snapshot can be
+		 * built using the record.
 		 *
-		 * That's not needed (or indeed helpful) for physical slots as they'll
-		 * start replay at the last logged checkpoint anyway. Instead return
-		 * the location of the last redo LSN. While that slightly increases
-		 * the chance that we have to retry, it's where a base backup has to
-		 * start replay at.
+		 * None of this is needed (or indeed helpful) for physical slots as
+		 * they'll start replay at the last logged checkpoint anyway. Instead
+		 * return the location of the last redo LSN. While that slightly
+		 * increases the chance that we have to retry, it's where a base
+		 * backup has to start replay at.
 		 */
-		if (!RecoveryInProgress() && SlotIsLogical(slot))
-		{
-			XLogRecPtr	flushptr;
-
-			/* start at current insert position */
-			restart_lsn = GetXLogInsertRecPtr();
-			SpinLockAcquire(&slot->mutex);
-			slot->data.restart_lsn = restart_lsn;
-			SpinLockRelease(&slot->mutex);
-
-			/* make sure we have enough information to start */
-			flushptr = LogStandbySnapshot();
-
-			/* and make sure it's fsynced to disk */
-			XLogFlush(flushptr);
-		}
-		else
-		{
+		if (SlotIsPhysical(slot))
 			restart_lsn = GetRedoRecPtr();
-			SpinLockAcquire(&slot->mutex);
-			slot->data.restart_lsn = restart_lsn;
-			SpinLockRelease(&slot->mutex);
-		}
+		else if (RecoveryInProgress())
+			restart_lsn = GetXLogReplayRecPtr(NULL);
+		else
+			restart_lsn = GetXLogInsertRecPtr();
+
+		SpinLockAcquire(&slot->mutex);
+		slot->data.restart_lsn = restart_lsn;
+		SpinLockRelease(&slot->mutex);
 
 		/* prevent WAL removal as fast as possible */
 		ReplicationSlotsComputeRequiredLSN();
@@ -1238,6 +1230,17 @@ ReplicationSlotReserveWal(void)
 		if (XLogGetLastRemovedSegno() < segno)
 			break;
 	}
+
+	if (!RecoveryInProgress() && SlotIsLogical(slot))
+	{
+		XLogRecPtr	flushptr;
+
+		/* make sure we have enough information to start */
+		flushptr = LogStandbySnapshot();
+
+		/* and make sure it's fsynced to disk */
+		XLogFlush(flushptr);
+	}
 }
 
 /*
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 66493b6e896..743e338b1b6 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -906,23 +906,34 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 	int			count;
 	WALReadError errinfo;
 	XLogSegNo	segno;
-	TimeLineID	currTLI = GetWALInsertionTimeLine();
+	TimeLineID	currTLI;
 
 	/*
-	 * Since logical decoding is only permitted on a primary server, we know
-	 * that the current timeline ID can't be changing any more. If we did this
-	 * on a standby, we'd have to worry about the values we compute here
-	 * becoming invalid due to a promotion or timeline change.
+	 * Make sure we have enough WAL available before retrieving the current
+	 * timeline. This is needed to determine am_cascading_walsender accurately
+	 * which is needed to determine the current timeline.
 	 */
+	flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
+
+	/*
+	 * Since logical decoding is also permitted on a standby server, we need
+	 * to check if the server is in recovery to decide how to get the current
+	 * timeline ID (so that it also cover the promotion or timeline change
+	 * cases).
+	 */
+	am_cascading_walsender = RecoveryInProgress();
+
+	if (am_cascading_walsender)
+		GetXLogReplayRecPtr(&currTLI);
+	else
+		currTLI = GetWALInsertionTimeLine();
+
 	XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
 	sendTimeLineIsHistoric = (state->currTLI != currTLI);
 	sendTimeLine = state->currTLI;
 	sendTimeLineValidUpto = state->currTLIValidUntil;
 	sendTimeLineNextTLI = state->nextTLI;
 
-	/* make sure we have enough WAL available */
-	flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
-
 	/* fail if not (implies we are going to shut down) */
 	if (flushptr < targetPagePtr + reqLen)
 		return -1;
@@ -937,9 +948,9 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 				 cur_page,
 				 targetPagePtr,
 				 XLOG_BLCKSZ,
-				 state->seg.ws_tli, /* Pass the current TLI because only
-									 * WalSndSegmentOpen controls whether new
-									 * TLI is needed. */
+				 currTLI,		/* Pass the current TLI because only
+								 * WalSndSegmentOpen controls whether new TLI
+								 * is needed. */
 				 &errinfo))
 		WALReadRaiseError(&errinfo);
 
@@ -3076,10 +3087,14 @@ XLogSendLogical(void)
 	 * If first time through in this session, initialize flushPtr.  Otherwise,
 	 * we only need to update flushPtr if EndRecPtr is past it.
 	 */
-	if (flushPtr == InvalidXLogRecPtr)
-		flushPtr = GetFlushRecPtr(NULL);
-	else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
-		flushPtr = GetFlushRecPtr(NULL);
+	if (flushPtr == InvalidXLogRecPtr ||
+		logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
+	{
+		if (am_cascading_walsender)
+			flushPtr = GetStandbyFlushRecPtr(NULL);
+		else
+			flushPtr = GetFlushRecPtr(NULL);
+	}
 
 	/* If EndRecPtr is still past our flushPtr, it means we caught up. */
 	if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
@@ -3170,7 +3185,8 @@ GetStandbyFlushRecPtr(TimeLineID *tli)
 	receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
 	replayPtr = GetXLogReplayRecPtr(&replayTLI);
 
-	*tli = replayTLI;
+	if (tli)
+		*tli = replayTLI;
 
 	result = replayPtr;
 	if (receiveTLI == replayTLI && receivePtr > replayPtr)
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 4211d31f307..326f8bc2f47 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -27074,6 +27074,21 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         prepared with <xref linkend="sql-prepare-transaction"/>.
        </para></entry>
       </row>
+      <row>
+       <entry role="func_table_entry"><para role="func_signature">
+        <indexterm>
+         <primary>pg_log_standby_snapshot</primary>
+        </indexterm>
+        <function>pg_log_standby_snapshot</function> ()
+        <returnvalue>pg_lsn</returnvalue>
+       </para>
+       <para>
+        Take a snapshot of running transactions and write this to WAL without
+        having to wait bgwriter or checkpointer to log one. This one is useful for
+        logical decoding on standby for which logical slot creation is hanging
+        until such a record is replayed on the standby.
+       </para></entry>
+      </row>
      </tbody>
     </tgroup>
    </table>
-- 
2.38.0

>From c79a99299a02e29fb8deb1fb55250a76805f273c Mon Sep 17 00:00:00 2001
From: bdrouvotAWS <bdrou...@amazon.com>
Date: Tue, 7 Feb 2023 09:04:12 +0000
Subject: [PATCH va67 8/9] New TAP test for logical decoding on standby
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

In addition to the new TAP test, this commit introduces a new pg_log_standby_snapshot()
function.

The idea is to be able to take a snapshot of running transactions and write this
to WAL without requesting for a (costly) checkpoint.

Author: "Drouvot, Bertrand" <bertranddrouvot...@gmail.com>
Author: Amit Khandekar <amitdkhan...@gmail.com>
Author: Craig Ringer <cr...@2ndquadrant.com> (in an older version)
Author: Andres Freund <and...@anarazel.de>
Reviewed-by: "Drouvot, Bertrand" <bertranddrouvot...@gmail.com>
Reviewed-by: Andres Freund <and...@anarazel.de>
Reviewed-by: Robert Haas <robertmh...@gmail.com>
Reviewed-by: Amit Kapila <amit.kapil...@gmail.com>
Reviewed-by: Fabrízio de Royes Mello <fabriziome...@gmail.com>
---
 src/test/perl/PostgreSQL/Test/Cluster.pm      |  37 +
 src/test/recovery/meson.build                 |   1 +
 .../t/035_standby_logical_decoding.pl         | 720 ++++++++++++++++++
 3 files changed, 758 insertions(+)
 create mode 100644 src/test/recovery/t/035_standby_logical_decoding.pl

diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm b/src/test/perl/PostgreSQL/Test/Cluster.pm
index a3aef8b5e91..62376de602b 100644
--- a/src/test/perl/PostgreSQL/Test/Cluster.pm
+++ b/src/test/perl/PostgreSQL/Test/Cluster.pm
@@ -3076,6 +3076,43 @@ $SIG{TERM} = $SIG{INT} = sub {
 
 =pod
 
+=item $node->create_logical_slot_on_standby(self, primary, slot_name, dbname)
+
+Create logical replication slot on given standby
+
+=cut
+
+sub create_logical_slot_on_standby
+{
+	my ($self, $primary, $slot_name, $dbname) = @_;
+	my ($stdout, $stderr);
+
+	my $handle;
+
+	$handle = IPC::Run::start(['pg_recvlogical', '-d', $self->connstr($dbname), '-P', 'test_decoding', '-S', $slot_name, '--create-slot'], '>', \$stdout, '2>', \$stderr);
+
+	# Once slot restart_lsn is created, the standby looks for xl_running_xacts
+	# WAL record from the restart_lsn onwards. So firstly, wait until the slot
+	# restart_lsn is evaluated.
+
+	$self->poll_query_until(
+		'postgres', qq[
+		SELECT restart_lsn IS NOT NULL
+		FROM pg_catalog.pg_replication_slots WHERE slot_name = '$slot_name'
+	]) or die "timed out waiting for logical slot to calculate its restart_lsn";
+
+	# Now arrange for the xl_running_xacts record for which pg_recvlogical
+	# is waiting.
+	$primary->safe_psql('postgres', 'SELECT pg_log_standby_snapshot()');
+
+	$handle->finish();
+
+	is($self->slot($slot_name)->{'slot_type'}, 'logical', $slot_name . ' on standby created')
+		or die "could not create slot" . $slot_name;
+}
+
+=pod
+
 =back
 
 =cut
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index 59465b97f3f..e834ad5e0dc 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -40,6 +40,7 @@ tests += {
       't/032_relfilenode_reuse.pl',
       't/033_replay_tsp_drops.pl',
       't/034_create_database.pl',
+      't/035_standby_logical_decoding.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl b/src/test/recovery/t/035_standby_logical_decoding.pl
new file mode 100644
index 00000000000..561dcd33c3b
--- /dev/null
+++ b/src/test/recovery/t/035_standby_logical_decoding.pl
@@ -0,0 +1,720 @@
+# logical decoding on standby : test logical decoding,
+# recovery conflict and standby promotion.
+
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my ($stdin, $stdout, $stderr, $cascading_stdout, $cascading_stderr, $ret, $handle, $slot);
+
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $node_cascading_standby = PostgreSQL::Test::Cluster->new('cascading_standby');
+my $default_timeout = $PostgreSQL::Test::Utils::timeout_default;
+my $res;
+
+# Name for the physical slot on primary
+my $primary_slotname = 'primary_physical';
+my $standby_physical_slotname = 'standby_physical';
+
+# find $pat in logfile of $node after $off-th byte
+sub find_in_log
+{
+	my ($node, $pat, $off) = @_;
+
+	$off = 0 unless defined $off;
+	my $log = PostgreSQL::Test::Utils::slurp_file($node->logfile);
+	return 0 if (length($log) <= $off);
+
+	$log = substr($log, $off);
+
+	return $log =~ m/$pat/;
+}
+
+# Fetch xmin columns from slot's pg_replication_slots row, after waiting for
+# given boolean condition to be true to ensure we've reached a quiescent state.
+sub wait_for_xmins
+{
+	my ($node, $slotname, $check_expr) = @_;
+
+	$node->poll_query_until(
+		'postgres', qq[
+		SELECT $check_expr
+		FROM pg_catalog.pg_replication_slots
+		WHERE slot_name = '$slotname';
+	]) or die "Timed out waiting for slot xmins to advance";
+}
+
+# Create the required logical slots on standby.
+sub create_logical_slots
+{
+	my ($node, $slot_prefix) = @_;
+
+	my $active_slot = $slot_prefix . 'activeslot';
+	my $inactive_slot = $slot_prefix . 'inactiveslot';
+	$node->create_logical_slot_on_standby($node_primary, qq($inactive_slot), 'testdb');
+	$node->create_logical_slot_on_standby($node_primary, qq($active_slot), 'testdb');
+}
+
+# Drop the logical slots on standby.
+sub drop_logical_slots
+{
+	my ($slot_prefix) = @_;
+	my $active_slot = $slot_prefix . 'activeslot';
+	my $inactive_slot = $slot_prefix . 'inactiveslot';
+
+	$node_standby->psql('postgres', qq[SELECT pg_drop_replication_slot('$inactive_slot')]);
+	$node_standby->psql('postgres', qq[SELECT pg_drop_replication_slot('$active_slot')]);
+}
+
+# Acquire one of the standby logical slots created by create_logical_slots().
+# In case wait is true we are waiting for an active pid on the 'activeslot' slot.
+# If wait is not true it means we are testing a known failure scenario.
+sub make_slot_active
+{
+	my ($node, $slot_prefix, $wait, $to_stdout, $to_stderr) = @_;
+	my $slot_user_handle;
+
+	my $active_slot = $slot_prefix . 'activeslot';
+	$slot_user_handle = IPC::Run::start(['pg_recvlogical', '-d', $node->connstr('testdb'), '-S', qq($active_slot), '-o', 'include-xids=0', '-o', 'skip-empty-xacts=1', '--no-loop', '--start', '-f', '-'], '>', $to_stdout, '2>', $to_stderr);
+
+	if ($wait)
+	{
+		# make sure activeslot is in use
+		$node->poll_query_until('testdb',
+			qq[SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = '$active_slot' AND active_pid IS NOT NULL)]
+		) or die "slot never became active";
+	}
+	return $slot_user_handle;
+}
+
+# Check pg_recvlogical stderr
+sub check_pg_recvlogical_stderr
+{
+	my ($slot_user_handle, $check_stderr) = @_;
+	my $return;
+
+	# our client should've terminated in response to the walsender error
+	$slot_user_handle->finish;
+	$return = $?;
+	cmp_ok($return, "!=", 0, "pg_recvlogical exited non-zero");
+	if ($return) {
+		like($stderr, qr/$check_stderr/, 'slot has been invalidated');
+	}
+
+	return 0;
+}
+
+# Check if all the slots on standby are dropped. These include the 'activeslot'
+# that was acquired by make_slot_active(), and the non-active 'inactiveslot'.
+sub check_slots_dropped
+{
+	my ($slot_prefix, $slot_user_handle) = @_;
+
+	is($node_standby->slot($slot_prefix . 'inactiveslot')->{'slot_type'}, '', 'inactiveslot on standby dropped');
+	is($node_standby->slot($slot_prefix . 'activeslot')->{'slot_type'}, '', 'activeslot on standby dropped');
+
+	check_pg_recvlogical_stderr($slot_user_handle, "conflict with recovery");
+}
+
+# Change hot_standby_feedback and check xmin and catalog_xmin values.
+sub change_hot_standby_feedback_and_wait_for_xmins
+{
+	my ($hsf, $invalidated) = @_;
+
+	$node_standby->append_conf('postgresql.conf',qq[
+	hot_standby_feedback = $hsf
+	]);
+
+	$node_standby->reload;
+
+	if ($hsf && $invalidated)
+	{
+		# With hot_standby_feedback on, xmin should advance,
+		# but catalog_xmin should still remain NULL since there is no logical slot.
+		wait_for_xmins($node_primary, $primary_slotname,
+			   "xmin IS NOT NULL AND catalog_xmin IS NULL");
+	}
+	elsif ($hsf)
+	{
+		# With hot_standby_feedback on, xmin and catalog_xmin should advance.
+		wait_for_xmins($node_primary, $primary_slotname,
+			   "xmin IS NOT NULL AND catalog_xmin IS NOT NULL");
+	}
+	else
+	{
+		# Both should be NULL since hs_feedback is off
+		wait_for_xmins($node_primary, $primary_slotname,
+			   "xmin IS NULL AND catalog_xmin IS NULL");
+
+	}
+}
+
+# Check conflicting status in pg_replication_slots.
+sub check_slots_conflicting_status
+{
+	my ($conflicting) = @_;
+
+	if ($conflicting)
+	{
+		$res = $node_standby->safe_psql(
+				'postgres', qq(
+				 select bool_and(conflicting) from pg_replication_slots;));
+
+		is($res, 't',
+			"Logical slots are reported as conflicting");
+	}
+	else
+	{
+		$res = $node_standby->safe_psql(
+				'postgres', qq(
+				select bool_or(conflicting) from pg_replication_slots;));
+
+		is($res, 'f',
+			"Logical slots are reported as non conflicting");
+	}
+}
+
+# Drop the slots, re-create them, change hot_standby_feedback,
+# check xmin and catalog_xmin values, make slot active and reset stat.
+sub reactive_slots_change_hfs_and_wait_for_xmins
+{
+	my ($previous_slot_prefix, $slot_prefix, $hsf, $invalidated) = @_;
+
+	# drop the logical slots
+	drop_logical_slots($previous_slot_prefix);
+
+	# create the logical slots
+	create_logical_slots($node_standby, $slot_prefix);
+
+	change_hot_standby_feedback_and_wait_for_xmins($hsf, $invalidated);
+
+	$handle = make_slot_active($node_standby, $slot_prefix, 1, \$stdout, \$stderr);
+
+	# reset stat: easier to check for confl_active_logicalslot in pg_stat_database_conflicts
+	$node_standby->psql('testdb', q[select pg_stat_reset();]);
+}
+
+# Check invalidation in the logfile and in pg_stat_database_conflicts
+sub check_for_invalidation
+{
+	my ($slot_prefix, $log_start, $test_name) = @_;
+
+	my $active_slot = $slot_prefix . 'activeslot';
+	my $inactive_slot = $slot_prefix . 'inactiveslot';
+
+	# message should be issued
+	ok( find_in_log(
+		$node_standby,
+		"invalidating obsolete replication slot \"$inactive_slot\"", $log_start),
+		"inactiveslot slot invalidation is logged $test_name");
+
+	ok( find_in_log(
+		$node_standby,
+		"invalidating obsolete replication slot \"$active_slot\"", $log_start),
+		"activeslot slot invalidation is logged $test_name");
+
+	# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been updated
+	ok( $node_standby->poll_query_until(
+		'postgres',
+		"select (confl_active_logicalslot = 1) from pg_stat_database_conflicts where datname = 'testdb'", 't'),
+		'confl_active_logicalslot updated') or die "Timed out waiting confl_active_logicalslot to be updated";
+}
+########################
+# Initialize primary node
+########################
+
+$node_primary->init(allows_streaming => 1, has_archiving => 1);
+$node_primary->append_conf('postgresql.conf', q{
+wal_level = 'logical'
+max_replication_slots = 4
+max_wal_senders = 4
+log_min_messages = 'debug2'
+log_error_verbosity = verbose
+});
+$node_primary->dump_info;
+$node_primary->start;
+
+$node_primary->psql('postgres', q[CREATE DATABASE testdb]);
+
+$node_primary->safe_psql('testdb', qq[SELECT * FROM pg_create_physical_replication_slot('$primary_slotname');]);
+
+# Check conflicting is NULL for physical slot
+$res = $node_primary->safe_psql(
+		'postgres', qq[
+		 SELECT conflicting is null FROM pg_replication_slots where slot_name = '$primary_slotname';]);
+
+is($res, 't',
+	"Physical slot reports conflicting as NULL");
+
+my $backup_name = 'b1';
+$node_primary->backup($backup_name);
+
+#######################
+# Initialize standby node
+#######################
+
+$node_standby->init_from_backup(
+	$node_primary, $backup_name,
+	has_streaming => 1,
+	has_restoring => 1);
+$node_standby->append_conf('postgresql.conf',
+	qq[primary_slot_name = '$primary_slotname']);
+$node_standby->start;
+$node_primary->wait_for_replay_catchup($node_standby);
+$node_standby->safe_psql('testdb', qq[SELECT * FROM pg_create_physical_replication_slot('$standby_physical_slotname');]);
+
+#######################
+# Initialize cascading standby node
+#######################
+$node_standby->backup($backup_name);
+$node_cascading_standby->init_from_backup(
+	$node_standby, $backup_name,
+	has_streaming => 1,
+	has_restoring => 1);
+$node_cascading_standby->append_conf('postgresql.conf',
+	qq[primary_slot_name = '$standby_physical_slotname']);
+$node_cascading_standby->start;
+$node_standby->wait_for_replay_catchup($node_cascading_standby, $node_primary);
+
+##################################################
+# Test that logical decoding on the standby
+# behaves correctly.
+##################################################
+
+# create the logical slots
+create_logical_slots($node_standby, 'behaves_ok_');
+
+$node_primary->safe_psql('testdb', qq[CREATE TABLE decoding_test(x integer, y text);]);
+$node_primary->safe_psql('testdb', qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,10) s;]);
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+my $result = $node_standby->safe_psql('testdb',
+	qq[SELECT pg_logical_slot_get_changes('behaves_ok_activeslot', NULL, NULL);]);
+
+# test if basic decoding works
+is(scalar(my @foobar = split /^/m, $result),
+	14, 'Decoding produced 14 rows (2 BEGIN/COMMIT and 10 rows)');
+
+# Insert some rows and verify that we get the same results from pg_recvlogical
+# and the SQL interface.
+$node_primary->safe_psql('testdb',
+	qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;]
+);
+
+my $expected = q{BEGIN
+table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
+table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
+table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
+table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
+COMMIT};
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+my $stdout_sql = $node_standby->safe_psql('testdb',
+	qq[SELECT data FROM pg_logical_slot_peek_changes('behaves_ok_activeslot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
+);
+
+is($stdout_sql, $expected, 'got expected output from SQL decoding session');
+
+my $endpos = $node_standby->safe_psql('testdb',
+	"SELECT lsn FROM pg_logical_slot_peek_changes('behaves_ok_activeslot', NULL, NULL) ORDER BY lsn DESC LIMIT 1;"
+);
+
+# Insert some rows after $endpos, which we won't read.
+$node_primary->safe_psql('testdb',
+	qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(5,50) s;]
+);
+
+$node_primary->wait_for_catchup($node_standby);
+
+my $stdout_recv = $node_standby->pg_recvlogical_upto(
+    'testdb', 'behaves_ok_activeslot', $endpos, $default_timeout,
+    'include-xids'     => '0',
+    'skip-empty-xacts' => '1');
+chomp($stdout_recv);
+is($stdout_recv, $expected,
+    'got same expected output from pg_recvlogical decoding session');
+
+$node_standby->poll_query_until('testdb',
+	"SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'behaves_ok_activeslot' AND active_pid IS NULL)"
+) or die "slot never became inactive";
+
+$stdout_recv = $node_standby->pg_recvlogical_upto(
+    'testdb', 'behaves_ok_activeslot', $endpos, $default_timeout,
+    'include-xids'     => '0',
+    'skip-empty-xacts' => '1');
+chomp($stdout_recv);
+is($stdout_recv, '', 'pg_recvlogical acknowledged changes');
+
+$node_primary->safe_psql('postgres', 'CREATE DATABASE otherdb');
+
+is( $node_primary->psql(
+        'otherdb',
+        "SELECT lsn FROM pg_logical_slot_peek_changes('behaves_ok_activeslot', NULL, NULL) ORDER BY lsn DESC LIMIT 1;"
+    ),
+    3,
+    'replaying logical slot from another database fails');
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 1: hot_standby_feedback off and vacuum FULL
+##################################################
+
+# One way to produce recovery conflict is to create/drop a relation and
+# launch a vacuum full on pg_class with hot_standby_feedback turned off on
+# the standby.
+reactive_slots_change_hfs_and_wait_for_xmins('behaves_ok_', 'vacuum_full_', 0, 1);
+
+# This should trigger the conflict
+$node_primary->safe_psql('testdb', qq[CREATE TABLE conflict_test(x integer, y text);]);
+$node_primary->safe_psql('testdb', qq[DROP TABLE conflict_test;]);
+$node_primary->safe_psql('testdb', 'VACUUM full pg_class;');
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# Check invalidation in the logfile and in pg_stat_database_conflicts
+check_for_invalidation('vacuum_full_', 1, 'with vacuum FULL on pg_class');
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+$handle = make_slot_active($node_standby, 'vacuum_full_', 0, \$stdout, \$stderr);
+
+# We are not able to read from the slot as it has been invalidated
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication slot \"vacuum_full_activeslot\"");
+
+# Turn hot_standby_feedback back on
+change_hot_standby_feedback_and_wait_for_xmins(1,1);
+
+##################################################
+# Verify that invalidated logical slots stay invalidated across a restart.
+##################################################
+$node_standby->restart;
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+##################################################
+# Verify that invalidated logical slots do not lead to retaining WAL
+##################################################
+# XXXXX TODO
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 2: conflict due to row removal with hot_standby_feedback off.
+##################################################
+
+# get the position to search from in the standby logfile
+my $logstart = -s $node_standby->logfile;
+
+# One way to produce recovery conflict is to create/drop a relation and
+# launch a vacuum on pg_class with hot_standby_feedback turned off on the standby.
+reactive_slots_change_hfs_and_wait_for_xmins('vacuum_full_', 'row_removal_', 0, 1);
+
+# This should trigger the conflict
+$node_primary->safe_psql('testdb', qq[CREATE TABLE conflict_test(x integer, y text);]);
+$node_primary->safe_psql('testdb', qq[DROP TABLE conflict_test;]);
+$node_primary->safe_psql('testdb', 'VACUUM pg_class;');
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# Check invalidation in the logfile and in pg_stat_database_conflicts
+check_for_invalidation('row_removal_', $logstart, 'with vacuum on pg_class');
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+$handle = make_slot_active($node_standby, 'row_removal_', 0, \$stdout, \$stderr);
+
+# We are not able to read from the slot as it has been invalidated
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication slot \"row_removal_activeslot\"");
+
+##################################################
+# Recovery conflict: Same as Scenario 2 but on a shared catalog table
+# Scenario 3: conflict due to row removal with hot_standby_feedback off.
+##################################################
+
+# get the position to search from in the standby logfile
+$logstart = -s $node_standby->logfile;
+
+# One way to produce recovery conflict is to create/drop a relation and
+# launch a vacuum on pg_class with hot_standby_feedback turned off on the standby.
+reactive_slots_change_hfs_and_wait_for_xmins('row_removal_', 'shared_row_removal_', 0, 1);
+
+# Trigger the conflict. The txid_current() is to ensure there's some WAL
+# record associated with the database, otherwise the wait below does not work
+# reliably.
+$node_primary->safe_psql('testdb', qq[
+  CREATE ROLE create_trash;
+  DROP ROLE create_trash;
+  SELECT txid_current();
+]);
+$node_primary->safe_psql('testdb', 'VACUUM pg_authid;');
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# Check invalidation in the logfile and in pg_stat_database_conflicts
+check_for_invalidation('shared_row_removal_', $logstart, 'with vacuum on pg_authid');
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+$handle = make_slot_active($node_standby, 'shared_row_removal_', 0, \$stdout, \$stderr);
+
+# We are not able to read from the slot as it has been invalidated
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication slot \"shared_row_removal_activeslot\"");
+
+##################################################
+# Recovery conflict: Same as Scenario 2 but on a non catalog table
+# Scenario 4: No conflict expected.
+##################################################
+
+# get the position to search from in the standby logfile
+$logstart = -s $node_standby->logfile;
+
+reactive_slots_change_hfs_and_wait_for_xmins('shared_row_removal_', 'no_conflict_', 0, 1);
+
+# This should not trigger a conflict
+$node_primary->safe_psql('testdb', qq[CREATE TABLE conflict_test(x integer, y text);
+									  INSERT INTO conflict_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;
+									  UPDATE conflict_test set x=1, y=1;
+									  VACUUM conflict_test;]);
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# message should not be issued
+ok( !find_in_log(
+   $node_standby,
+  "invalidating obsolete slot \"no_conflict_inactiveslot\"", $logstart),
+  'inactiveslot slot invalidation is not logged with vacuum on conflict_test');
+
+ok( !find_in_log(
+   $node_standby,
+  "invalidating obsolete slot \"no_conflict_activeslot\"", $logstart),
+  'activeslot slot invalidation is not logged with vacuum on conflict_test');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has not been updated
+ok( $node_standby->poll_query_until(
+	'postgres',
+	"select (confl_active_logicalslot = 0) from pg_stat_database_conflicts where datname = 'testdb'", 't'),
+	'confl_active_logicalslot not updated') or die "Timed out waiting confl_active_logicalslot to be updated";
+
+# Verify slots are reported as non conflicting in pg_replication_slots
+check_slots_conflicting_status(0);
+
+# Turn hot_standby_feedback back on
+change_hot_standby_feedback_and_wait_for_xmins(1, 0);
+
+# Restart the standby node to ensure no slots are still active
+$node_standby->restart;
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 4: conflict due to on-access pruning.
+##################################################
+
+# get the position to search from in the standby logfile
+$logstart = -s $node_standby->logfile;
+
+# One way to produce recovery conflict is to trigger an on-access pruning
+# on a relation marked as user_catalog_table.
+reactive_slots_change_hfs_and_wait_for_xmins('no_conflict_', 'pruning_', 0, 0);
+
+# This should trigger the conflict
+$node_primary->safe_psql('testdb', qq[CREATE TABLE prun(id integer, s char(2000)) WITH (fillfactor = 75, user_catalog_table = true);]);
+$node_primary->safe_psql('testdb', qq[INSERT INTO prun VALUES (1, 'A');]);
+$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'B';]);
+$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'C';]);
+$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'D';]);
+$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'E';]);
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# Check invalidation in the logfile and in pg_stat_database_conflicts
+check_for_invalidation('pruning_', $logstart, 'with on-access pruning');
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+$handle = make_slot_active($node_standby, 'pruning_', 0, \$stdout, \$stderr);
+
+# We are not able to read from the slot as it has been invalidated
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication slot \"pruning_activeslot\"");
+
+# Turn hot_standby_feedback back on
+change_hot_standby_feedback_and_wait_for_xmins(1, 1);
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 5: incorrect wal_level on primary.
+##################################################
+
+# get the position to search from in the standby logfile
+$logstart = -s $node_standby->logfile;
+
+# drop the logical slots
+drop_logical_slots('pruning_');
+
+# create the logical slots
+create_logical_slots($node_standby, 'wal_level_');
+
+$handle = make_slot_active($node_standby, 'wal_level_', 1, \$stdout, \$stderr);
+
+# reset stat: easier to check for confl_active_logicalslot in pg_stat_database_conflicts
+$node_standby->psql('testdb', q[select pg_stat_reset();]);
+
+# Make primary wal_level replica. This will trigger slot conflict.
+$node_primary->append_conf('postgresql.conf',q[
+wal_level = 'replica'
+]);
+$node_primary->restart;
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# Check invalidation in the logfile and in pg_stat_database_conflicts
+check_for_invalidation('wal_level_', $logstart, 'due to wal_level');
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+$handle = make_slot_active($node_standby, 'wal_level_', 0, \$stdout, \$stderr);
+# We are not able to read from the slot as it requires wal_level at least logical on the primary server
+check_pg_recvlogical_stderr($handle, "logical decoding on standby requires wal_level to be at least logical on the primary server");
+
+# Restore primary wal_level
+$node_primary->append_conf('postgresql.conf',q[
+wal_level = 'logical'
+]);
+$node_primary->restart;
+$node_primary->wait_for_replay_catchup($node_standby);
+
+$handle = make_slot_active($node_standby, 'wal_level_', 0, \$stdout, \$stderr);
+# as the slot has been invalidated we should not be able to read
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication slot \"wal_level_activeslot\"");
+
+##################################################
+# DROP DATABASE should drops it's slots, including active slots.
+##################################################
+
+# drop the logical slots
+drop_logical_slots('wal_level_');
+
+# create the logical slots
+create_logical_slots($node_standby, 'drop_db_');
+
+$handle = make_slot_active($node_standby, 'drop_db_', 1, \$stdout, \$stderr);
+
+# Create a slot on a database that would not be dropped. This slot should not
+# get dropped.
+$node_standby->create_logical_slot_on_standby($node_primary, 'otherslot', 'postgres');
+
+# dropdb on the primary to verify slots are dropped on standby
+$node_primary->safe_psql('postgres', q[DROP DATABASE testdb]);
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+is($node_standby->safe_psql('postgres',
+	q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb')]), 'f',
+	'database dropped on standby');
+
+check_slots_dropped('drop_db', $handle);
+
+is($node_standby->slot('otherslot')->{'slot_type'}, 'logical',
+	'otherslot on standby not dropped');
+
+# Cleanup : manually drop the slot that was not dropped.
+$node_standby->psql('postgres', q[SELECT pg_drop_replication_slot('otherslot')]);
+
+##################################################
+# Test standby promotion and logical decoding behavior
+# after the standby gets promoted.
+##################################################
+
+$node_standby->reload;
+
+$node_primary->psql('postgres', q[CREATE DATABASE testdb]);
+$node_primary->safe_psql('testdb', qq[CREATE TABLE decoding_test(x integer, y text);]);
+
+# create the logical slots
+create_logical_slots($node_standby, 'promotion_');
+
+# create the logical slots on the cascading standby too
+create_logical_slots($node_cascading_standby, 'promotion_');
+
+# Make slots actives
+$handle = make_slot_active($node_standby, 'promotion_', 1, \$stdout, \$stderr);
+my $cascading_handle = make_slot_active($node_cascading_standby, 'promotion_', 1, \$cascading_stdout, \$cascading_stderr);
+
+# Insert some rows before the promotion
+$node_primary->safe_psql('testdb',
+	qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;]
+);
+
+# Wait for both standbys to catchup
+$node_primary->wait_for_replay_catchup($node_standby);
+$node_standby->wait_for_replay_catchup($node_cascading_standby, $node_primary);
+
+# promote
+$node_standby->promote;
+
+# insert some rows on promoted standby
+$node_standby->safe_psql('testdb',
+	qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(5,7) s;]
+);
+
+# Wait for the cascading standby to catchup
+$node_standby->wait_for_replay_catchup($node_cascading_standby);
+
+$expected = q{BEGIN
+table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
+table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
+table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
+table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
+COMMIT
+BEGIN
+table public.decoding_test: INSERT: x[integer]:5 y[text]:'5'
+table public.decoding_test: INSERT: x[integer]:6 y[text]:'6'
+table public.decoding_test: INSERT: x[integer]:7 y[text]:'7'
+COMMIT};
+
+# check that we are decoding pre and post promotion inserted rows
+$stdout_sql = $node_standby->safe_psql('testdb',
+	qq[SELECT data FROM pg_logical_slot_peek_changes('promotion_inactiveslot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
+);
+
+is($stdout_sql, $expected, 'got expected output from SQL decoding session on promoted standby');
+
+# check that we are decoding pre and post promotion inserted rows
+# with pg_recvlogical that has started before the promotion
+my $pump_timeout = IPC::Run::timer($PostgreSQL::Test::Utils::timeout_default);
+
+ok( pump_until(
+        $handle, $pump_timeout, \$stdout, qr/^.*COMMIT.*COMMIT$/s),
+    'got 2 COMMIT from pg_recvlogical output');
+
+chomp($stdout);
+is($stdout, $expected,
+    'got same expected output from pg_recvlogical decoding session');
+
+# check that we are decoding pre and post promotion inserted rows on the cascading standby
+$stdout_sql = $node_cascading_standby->safe_psql('testdb',
+	qq[SELECT data FROM pg_logical_slot_peek_changes('promotion_inactiveslot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
+);
+
+is($stdout_sql, $expected, 'got expected output from SQL decoding session on cascading standby');
+
+# check that we are decoding pre and post promotion inserted rows
+# with pg_recvlogical that has started before the promotion on the cascading standby
+ok( pump_until(
+        $cascading_handle, $pump_timeout, \$cascading_stdout, qr/^.*COMMIT.*COMMIT$/s),
+    'got 2 COMMIT from pg_recvlogical output');
+
+chomp($cascading_stdout);
+is($cascading_stdout, $expected,
+    'got same expected output from pg_recvlogical decoding session on cascading standby');
+
+done_testing();
-- 
2.38.0

>From 3d516d367777d695e66666f3a989a132d089f354 Mon Sep 17 00:00:00 2001
From: bdrouvotAWS <bdrou...@amazon.com>
Date: Tue, 7 Feb 2023 09:04:12 +0000
Subject: [PATCH va67 8/9] TAP test for logical decoding on standby
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Author: "Drouvot, Bertrand" <bertranddrouvot...@gmail.com>
Author: Amit Khandekar <amitdkhan...@gmail.com>
Author: Craig Ringer <cr...@2ndquadrant.com> (in an older version)
Author: Andres Freund <and...@anarazel.de>
Reviewed-by: "Drouvot, Bertrand" <bertranddrouvot...@gmail.com>
Reviewed-by: Andres Freund <and...@anarazel.de>
Reviewed-by: Robert Haas <robertmh...@gmail.com>
Reviewed-by: Amit Kapila <amit.kapil...@gmail.com>
Reviewed-by: Fabrízio de Royes Mello <fabriziome...@gmail.com>
---
 src/test/perl/PostgreSQL/Test/Cluster.pm      |  37 +
 src/test/recovery/meson.build                 |   1 +
 .../t/035_standby_logical_decoding.pl         | 732 ++++++++++++++++++
 3 files changed, 770 insertions(+)
 create mode 100644 src/test/recovery/t/035_standby_logical_decoding.pl

diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm b/src/test/perl/PostgreSQL/Test/Cluster.pm
index a3aef8b5e91..62376de602b 100644
--- a/src/test/perl/PostgreSQL/Test/Cluster.pm
+++ b/src/test/perl/PostgreSQL/Test/Cluster.pm
@@ -3076,6 +3076,43 @@ $SIG{TERM} = $SIG{INT} = sub {
 
 =pod
 
+=item $node->create_logical_slot_on_standby(self, primary, slot_name, dbname)
+
+Create logical replication slot on given standby
+
+=cut
+
+sub create_logical_slot_on_standby
+{
+	my ($self, $primary, $slot_name, $dbname) = @_;
+	my ($stdout, $stderr);
+
+	my $handle;
+
+	$handle = IPC::Run::start(['pg_recvlogical', '-d', $self->connstr($dbname), '-P', 'test_decoding', '-S', $slot_name, '--create-slot'], '>', \$stdout, '2>', \$stderr);
+
+	# Once slot restart_lsn is created, the standby looks for xl_running_xacts
+	# WAL record from the restart_lsn onwards. So firstly, wait until the slot
+	# restart_lsn is evaluated.
+
+	$self->poll_query_until(
+		'postgres', qq[
+		SELECT restart_lsn IS NOT NULL
+		FROM pg_catalog.pg_replication_slots WHERE slot_name = '$slot_name'
+	]) or die "timed out waiting for logical slot to calculate its restart_lsn";
+
+	# Now arrange for the xl_running_xacts record for which pg_recvlogical
+	# is waiting.
+	$primary->safe_psql('postgres', 'SELECT pg_log_standby_snapshot()');
+
+	$handle->finish();
+
+	is($self->slot($slot_name)->{'slot_type'}, 'logical', $slot_name . ' on standby created')
+		or die "could not create slot" . $slot_name;
+}
+
+=pod
+
 =back
 
 =cut
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index 59465b97f3f..e834ad5e0dc 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -40,6 +40,7 @@ tests += {
       't/032_relfilenode_reuse.pl',
       't/033_replay_tsp_drops.pl',
       't/034_create_database.pl',
+      't/035_standby_logical_decoding.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl b/src/test/recovery/t/035_standby_logical_decoding.pl
new file mode 100644
index 00000000000..ec56fa88cbf
--- /dev/null
+++ b/src/test/recovery/t/035_standby_logical_decoding.pl
@@ -0,0 +1,732 @@
+# logical decoding on standby : test logical decoding,
+# recovery conflict and standby promotion.
+
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my ($stdin, $stdout, $stderr, $cascading_stdout, $cascading_stderr, $ret, $handle, $slot);
+
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $node_cascading_standby = PostgreSQL::Test::Cluster->new('cascading_standby');
+my $default_timeout = $PostgreSQL::Test::Utils::timeout_default;
+my $res;
+
+# Name for the physical slot on primary
+my $primary_slotname = 'primary_physical';
+my $standby_physical_slotname = 'standby_physical';
+
+# find $pat in logfile of $node after $off-th byte
+sub find_in_log
+{
+	my ($node, $pat, $off) = @_;
+
+	$off = 0 unless defined $off;
+	my $log = PostgreSQL::Test::Utils::slurp_file($node->logfile);
+	return 0 if (length($log) <= $off);
+
+	$log = substr($log, $off);
+
+	return $log =~ m/$pat/;
+}
+
+# Fetch xmin columns from slot's pg_replication_slots row, after waiting for
+# given boolean condition to be true to ensure we've reached a quiescent state.
+sub wait_for_xmins
+{
+	my ($node, $slotname, $check_expr) = @_;
+
+	$node->poll_query_until(
+		'postgres', qq[
+		SELECT $check_expr
+		FROM pg_catalog.pg_replication_slots
+		WHERE slot_name = '$slotname';
+	]) or die "Timed out waiting for slot xmins to advance";
+}
+
+# Create the required logical slots on standby.
+sub create_logical_slots
+{
+	my ($node, $slot_prefix) = @_;
+
+	my $active_slot = $slot_prefix . 'activeslot';
+	my $inactive_slot = $slot_prefix . 'inactiveslot';
+	$node->create_logical_slot_on_standby($node_primary, qq($inactive_slot), 'testdb');
+	$node->create_logical_slot_on_standby($node_primary, qq($active_slot), 'testdb');
+}
+
+# Drop the logical slots on standby.
+sub drop_logical_slots
+{
+	my ($slot_prefix) = @_;
+	my $active_slot = $slot_prefix . 'activeslot';
+	my $inactive_slot = $slot_prefix . 'inactiveslot';
+
+	$node_standby->psql('postgres', qq[SELECT pg_drop_replication_slot('$inactive_slot')]);
+	$node_standby->psql('postgres', qq[SELECT pg_drop_replication_slot('$active_slot')]);
+}
+
+# Acquire one of the standby logical slots created by create_logical_slots().
+# In case wait is true we are waiting for an active pid on the 'activeslot' slot.
+# If wait is not true it means we are testing a known failure scenario.
+sub make_slot_active
+{
+	my ($node, $slot_prefix, $wait, $to_stdout, $to_stderr) = @_;
+	my $slot_user_handle;
+
+	my $active_slot = $slot_prefix . 'activeslot';
+	$slot_user_handle = IPC::Run::start(['pg_recvlogical', '-d', $node->connstr('testdb'), '-S', qq($active_slot), '-o', 'include-xids=0', '-o', 'skip-empty-xacts=1', '--no-loop', '--start', '-f', '-'], '>', $to_stdout, '2>', $to_stderr);
+
+	if ($wait)
+	{
+		# make sure activeslot is in use
+		$node->poll_query_until('testdb',
+			qq[SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = '$active_slot' AND active_pid IS NOT NULL)]
+		) or die "slot never became active";
+	}
+	return $slot_user_handle;
+}
+
+# Check pg_recvlogical stderr
+sub check_pg_recvlogical_stderr
+{
+	my ($slot_user_handle, $check_stderr) = @_;
+	my $return;
+
+	# our client should've terminated in response to the walsender error
+	$slot_user_handle->finish;
+	$return = $?;
+	cmp_ok($return, "!=", 0, "pg_recvlogical exited non-zero");
+	if ($return) {
+		like($stderr, qr/$check_stderr/, 'slot has been invalidated');
+	}
+
+	return 0;
+}
+
+# Check if all the slots on standby are dropped. These include the 'activeslot'
+# that was acquired by make_slot_active(), and the non-active 'inactiveslot'.
+sub check_slots_dropped
+{
+	my ($slot_prefix, $slot_user_handle) = @_;
+
+	is($node_standby->slot($slot_prefix . 'inactiveslot')->{'slot_type'}, '', 'inactiveslot on standby dropped');
+	is($node_standby->slot($slot_prefix . 'activeslot')->{'slot_type'}, '', 'activeslot on standby dropped');
+
+	check_pg_recvlogical_stderr($slot_user_handle, "conflict with recovery");
+}
+
+# Change hot_standby_feedback and check xmin and catalog_xmin values.
+sub change_hot_standby_feedback_and_wait_for_xmins
+{
+	my ($hsf, $invalidated) = @_;
+
+	$node_standby->append_conf('postgresql.conf',qq[
+	hot_standby_feedback = $hsf
+	]);
+
+	$node_standby->reload;
+
+	if ($hsf && $invalidated)
+	{
+		# With hot_standby_feedback on, xmin should advance,
+		# but catalog_xmin should still remain NULL since there is no logical slot.
+		wait_for_xmins($node_primary, $primary_slotname,
+			   "xmin IS NOT NULL AND catalog_xmin IS NULL");
+	}
+	elsif ($hsf)
+	{
+		# With hot_standby_feedback on, xmin and catalog_xmin should advance.
+		wait_for_xmins($node_primary, $primary_slotname,
+			   "xmin IS NOT NULL AND catalog_xmin IS NOT NULL");
+	}
+	else
+	{
+		# Both should be NULL since hs_feedback is off
+		wait_for_xmins($node_primary, $primary_slotname,
+			   "xmin IS NULL AND catalog_xmin IS NULL");
+
+	}
+}
+
+# Check conflicting status in pg_replication_slots.
+sub check_slots_conflicting_status
+{
+	my ($conflicting) = @_;
+
+	if ($conflicting)
+	{
+		$res = $node_standby->safe_psql(
+				'postgres', qq(
+				 select bool_and(conflicting) from pg_replication_slots;));
+
+		is($res, 't',
+			"Logical slots are reported as conflicting");
+	}
+	else
+	{
+		$res = $node_standby->safe_psql(
+				'postgres', qq(
+				select bool_or(conflicting) from pg_replication_slots;));
+
+		is($res, 'f',
+			"Logical slots are reported as non conflicting");
+	}
+}
+
+# Drop the slots, re-create them, change hot_standby_feedback,
+# check xmin and catalog_xmin values, make slot active and reset stat.
+sub reactive_slots_change_hfs_and_wait_for_xmins
+{
+	my ($previous_slot_prefix, $slot_prefix, $hsf, $invalidated) = @_;
+
+	# drop the logical slots
+	drop_logical_slots($previous_slot_prefix);
+
+	# create the logical slots
+	create_logical_slots($node_standby, $slot_prefix);
+
+	change_hot_standby_feedback_and_wait_for_xmins($hsf, $invalidated);
+
+	$handle = make_slot_active($node_standby, $slot_prefix, 1, \$stdout, \$stderr);
+
+	# reset stat: easier to check for confl_active_logicalslot in pg_stat_database_conflicts
+	$node_standby->psql('testdb', q[select pg_stat_reset();]);
+}
+
+# Check invalidation in the logfile and in pg_stat_database_conflicts
+sub check_for_invalidation
+{
+	my ($slot_prefix, $log_start, $test_name) = @_;
+
+	my $active_slot = $slot_prefix . 'activeslot';
+	my $inactive_slot = $slot_prefix . 'inactiveslot';
+
+	# message should be issued
+	ok( find_in_log(
+		$node_standby,
+		"invalidating obsolete replication slot \"$inactive_slot\"", $log_start),
+		"inactiveslot slot invalidation is logged $test_name");
+
+	ok( find_in_log(
+		$node_standby,
+		"invalidating obsolete replication slot \"$active_slot\"", $log_start),
+		"activeslot slot invalidation is logged $test_name");
+
+	# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been updated
+	ok( $node_standby->poll_query_until(
+		'postgres',
+		"select (confl_active_logicalslot = 1) from pg_stat_database_conflicts where datname = 'testdb'", 't'),
+		'confl_active_logicalslot updated') or die "Timed out waiting confl_active_logicalslot to be updated";
+}
+
+########################
+# Initialize primary node
+########################
+
+$node_primary->init(allows_streaming => 1, has_archiving => 1);
+$node_primary->append_conf('postgresql.conf', q{
+wal_level = 'logical'
+max_replication_slots = 4
+max_wal_senders = 4
+log_min_messages = 'debug2'
+log_error_verbosity = verbose
+});
+$node_primary->dump_info;
+$node_primary->start;
+
+$node_primary->psql('postgres', q[CREATE DATABASE testdb]);
+
+$node_primary->safe_psql('testdb', qq[SELECT * FROM pg_create_physical_replication_slot('$primary_slotname');]);
+
+# Check conflicting is NULL for physical slot
+$res = $node_primary->safe_psql(
+		'postgres', qq[
+		 SELECT conflicting is null FROM pg_replication_slots where slot_name = '$primary_slotname';]);
+
+is($res, 't',
+	"Physical slot reports conflicting as NULL");
+
+my $backup_name = 'b1';
+$node_primary->backup($backup_name);
+
+# Some tests need to wait for VACUUM to be replayed. But vacuum does not flush
+# WAL. An insert into flush_wal outside transaction does guarantee a flush.
+$node_primary->psql('testdb', q[CREATE TABLE flush_wal();]);
+
+#######################
+# Initialize standby node
+#######################
+
+$node_standby->init_from_backup(
+	$node_primary, $backup_name,
+	has_streaming => 1,
+	has_restoring => 1);
+$node_standby->append_conf('postgresql.conf',
+	qq[primary_slot_name = '$primary_slotname']);
+$node_standby->start;
+$node_primary->wait_for_replay_catchup($node_standby);
+$node_standby->safe_psql('testdb', qq[SELECT * FROM pg_create_physical_replication_slot('$standby_physical_slotname');]);
+
+#######################
+# Initialize cascading standby node
+#######################
+$node_standby->backup($backup_name);
+$node_cascading_standby->init_from_backup(
+	$node_standby, $backup_name,
+	has_streaming => 1,
+	has_restoring => 1);
+$node_cascading_standby->append_conf('postgresql.conf',
+	qq[primary_slot_name = '$standby_physical_slotname']);
+$node_cascading_standby->start;
+$node_standby->wait_for_replay_catchup($node_cascading_standby, $node_primary);
+
+##################################################
+# Test that logical decoding on the standby
+# behaves correctly.
+##################################################
+
+# create the logical slots
+create_logical_slots($node_standby, 'behaves_ok_');
+
+$node_primary->safe_psql('testdb', qq[CREATE TABLE decoding_test(x integer, y text);]);
+$node_primary->safe_psql('testdb', qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,10) s;]);
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+my $result = $node_standby->safe_psql('testdb',
+	qq[SELECT pg_logical_slot_get_changes('behaves_ok_activeslot', NULL, NULL);]);
+
+# test if basic decoding works
+is(scalar(my @foobar = split /^/m, $result),
+	14, 'Decoding produced 14 rows (2 BEGIN/COMMIT and 10 rows)');
+
+# Insert some rows and verify that we get the same results from pg_recvlogical
+# and the SQL interface.
+$node_primary->safe_psql('testdb',
+	qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;]
+);
+
+my $expected = q{BEGIN
+table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
+table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
+table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
+table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
+COMMIT};
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+my $stdout_sql = $node_standby->safe_psql('testdb',
+	qq[SELECT data FROM pg_logical_slot_peek_changes('behaves_ok_activeslot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
+);
+
+is($stdout_sql, $expected, 'got expected output from SQL decoding session');
+
+my $endpos = $node_standby->safe_psql('testdb',
+	"SELECT lsn FROM pg_logical_slot_peek_changes('behaves_ok_activeslot', NULL, NULL) ORDER BY lsn DESC LIMIT 1;"
+);
+
+# Insert some rows after $endpos, which we won't read.
+$node_primary->safe_psql('testdb',
+	qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(5,50) s;]
+);
+
+$node_primary->wait_for_catchup($node_standby);
+
+my $stdout_recv = $node_standby->pg_recvlogical_upto(
+    'testdb', 'behaves_ok_activeslot', $endpos, $default_timeout,
+    'include-xids'     => '0',
+    'skip-empty-xacts' => '1');
+chomp($stdout_recv);
+is($stdout_recv, $expected,
+    'got same expected output from pg_recvlogical decoding session');
+
+$node_standby->poll_query_until('testdb',
+	"SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'behaves_ok_activeslot' AND active_pid IS NULL)"
+) or die "slot never became inactive";
+
+$stdout_recv = $node_standby->pg_recvlogical_upto(
+    'testdb', 'behaves_ok_activeslot', $endpos, $default_timeout,
+    'include-xids'     => '0',
+    'skip-empty-xacts' => '1');
+chomp($stdout_recv);
+is($stdout_recv, '', 'pg_recvlogical acknowledged changes');
+
+$node_primary->safe_psql('postgres', 'CREATE DATABASE otherdb');
+
+is( $node_primary->psql(
+        'otherdb',
+        "SELECT lsn FROM pg_logical_slot_peek_changes('behaves_ok_activeslot', NULL, NULL) ORDER BY lsn DESC LIMIT 1;"
+    ),
+    3,
+    'replaying logical slot from another database fails');
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 1: hot_standby_feedback off and vacuum FULL
+##################################################
+
+# One way to produce recovery conflict is to create/drop a relation and
+# launch a vacuum full on pg_class with hot_standby_feedback turned off on
+# the standby.
+reactive_slots_change_hfs_and_wait_for_xmins('behaves_ok_', 'vacuum_full_', 0, 1);
+
+# This should trigger the conflict
+$node_primary->safe_psql('testdb', qq[
+  CREATE TABLE conflict_test(x integer, y text);
+  DROP TABLE conflict_test;
+  VACUUM full pg_class;
+  INSERT INTO flush_wal DEFAULT VALUES; -- see create table flush_wal
+]);
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# Check invalidation in the logfile and in pg_stat_database_conflicts
+check_for_invalidation('vacuum_full_', 1, 'with vacuum FULL on pg_class');
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+$handle = make_slot_active($node_standby, 'vacuum_full_', 0, \$stdout, \$stderr);
+
+# We are not able to read from the slot as it has been invalidated
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication slot \"vacuum_full_activeslot\"");
+
+# Turn hot_standby_feedback back on
+change_hot_standby_feedback_and_wait_for_xmins(1,1);
+
+##################################################
+# Verify that invalidated logical slots stay invalidated across a restart.
+##################################################
+$node_standby->restart;
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+##################################################
+# Verify that invalidated logical slots do not lead to retaining WAL
+##################################################
+# XXXXX TODO
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 2: conflict due to row removal with hot_standby_feedback off.
+##################################################
+
+# get the position to search from in the standby logfile
+my $logstart = -s $node_standby->logfile;
+
+# One way to produce recovery conflict is to create/drop a relation and
+# launch a vacuum on pg_class with hot_standby_feedback turned off on the standby.
+reactive_slots_change_hfs_and_wait_for_xmins('vacuum_full_', 'row_removal_', 0, 1);
+
+# This should trigger the conflict
+$node_primary->safe_psql('testdb', qq[
+  CREATE TABLE conflict_test(x integer, y text);
+  DROP TABLE conflict_test;
+  VACUUM pg_class;
+  INSERT INTO flush_wal DEFAULT VALUES; -- see create table flush_wal
+]);
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# Check invalidation in the logfile and in pg_stat_database_conflicts
+check_for_invalidation('row_removal_', $logstart, 'with vacuum on pg_class');
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+$handle = make_slot_active($node_standby, 'row_removal_', 0, \$stdout, \$stderr);
+
+# We are not able to read from the slot as it has been invalidated
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication slot \"row_removal_activeslot\"");
+
+##################################################
+# Recovery conflict: Same as Scenario 2 but on a shared catalog table
+# Scenario 3: conflict due to row removal with hot_standby_feedback off.
+##################################################
+
+# get the position to search from in the standby logfile
+$logstart = -s $node_standby->logfile;
+
+# One way to produce recovery conflict is to create/drop a relation and
+# launch a vacuum on pg_class with hot_standby_feedback turned off on the standby.
+reactive_slots_change_hfs_and_wait_for_xmins('row_removal_', 'shared_row_removal_', 0, 1);
+
+# Trigger the conflict
+diag $node_primary->safe_psql('testdb', qq[
+  CREATE ROLE create_trash;
+  DROP ROLE create_trash;
+  VACUUM pg_authid;
+  INSERT INTO flush_wal DEFAULT VALUES; -- see create table flush_wal
+]);
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# Check invalidation in the logfile and in pg_stat_database_conflicts
+check_for_invalidation('shared_row_removal_', $logstart, 'with vacuum on pg_authid');
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+$handle = make_slot_active($node_standby, 'shared_row_removal_', 0, \$stdout, \$stderr);
+
+# We are not able to read from the slot as it has been invalidated
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication slot \"shared_row_removal_activeslot\"");
+
+##################################################
+# Recovery conflict: Same as Scenario 2 but on a non catalog table
+# Scenario 4: No conflict expected.
+##################################################
+
+# get the position to search from in the standby logfile
+$logstart = -s $node_standby->logfile;
+
+reactive_slots_change_hfs_and_wait_for_xmins('shared_row_removal_', 'no_conflict_', 0, 1);
+
+# This should not trigger a conflict
+$node_primary->safe_psql('testdb', qq[
+  CREATE TABLE conflict_test(x integer, y text);
+  INSERT INTO conflict_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;
+  UPDATE conflict_test set x=1, y=1;
+  VACUUM conflict_test;
+  INSERT INTO flush_wal DEFAULT VALUES; -- see create table flush_wal
+]);
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# message should not be issued
+ok( !find_in_log(
+   $node_standby,
+  "invalidating obsolete slot \"no_conflict_inactiveslot\"", $logstart),
+  'inactiveslot slot invalidation is not logged with vacuum on conflict_test');
+
+ok( !find_in_log(
+   $node_standby,
+  "invalidating obsolete slot \"no_conflict_activeslot\"", $logstart),
+  'activeslot slot invalidation is not logged with vacuum on conflict_test');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has not been updated
+ok( $node_standby->poll_query_until(
+	'postgres',
+	"select (confl_active_logicalslot = 0) from pg_stat_database_conflicts where datname = 'testdb'", 't'),
+	'confl_active_logicalslot not updated') or die "Timed out waiting confl_active_logicalslot to be updated";
+
+# Verify slots are reported as non conflicting in pg_replication_slots
+check_slots_conflicting_status(0);
+
+# Turn hot_standby_feedback back on
+change_hot_standby_feedback_and_wait_for_xmins(1, 0);
+
+# Restart the standby node to ensure no slots are still active
+$node_standby->restart;
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 4: conflict due to on-access pruning.
+##################################################
+
+# get the position to search from in the standby logfile
+$logstart = -s $node_standby->logfile;
+
+# One way to produce recovery conflict is to trigger an on-access pruning
+# on a relation marked as user_catalog_table.
+reactive_slots_change_hfs_and_wait_for_xmins('no_conflict_', 'pruning_', 0, 0);
+
+# This should trigger the conflict
+$node_primary->safe_psql('testdb', qq[CREATE TABLE prun(id integer, s char(2000)) WITH (fillfactor = 75, user_catalog_table = true);]);
+$node_primary->safe_psql('testdb', qq[INSERT INTO prun VALUES (1, 'A');]);
+$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'B';]);
+$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'C';]);
+$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'D';]);
+$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'E';]);
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# Check invalidation in the logfile and in pg_stat_database_conflicts
+check_for_invalidation('pruning_', $logstart, 'with on-access pruning');
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+$handle = make_slot_active($node_standby, 'pruning_', 0, \$stdout, \$stderr);
+
+# We are not able to read from the slot as it has been invalidated
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication slot \"pruning_activeslot\"");
+
+# Turn hot_standby_feedback back on
+change_hot_standby_feedback_and_wait_for_xmins(1, 1);
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 5: incorrect wal_level on primary.
+##################################################
+
+# get the position to search from in the standby logfile
+$logstart = -s $node_standby->logfile;
+
+# drop the logical slots
+drop_logical_slots('pruning_');
+
+# create the logical slots
+create_logical_slots($node_standby, 'wal_level_');
+
+$handle = make_slot_active($node_standby, 'wal_level_', 1, \$stdout, \$stderr);
+
+# reset stat: easier to check for confl_active_logicalslot in pg_stat_database_conflicts
+$node_standby->psql('testdb', q[select pg_stat_reset();]);
+
+# Make primary wal_level replica. This will trigger slot conflict.
+$node_primary->append_conf('postgresql.conf',q[
+wal_level = 'replica'
+]);
+$node_primary->restart;
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# Check invalidation in the logfile and in pg_stat_database_conflicts
+check_for_invalidation('wal_level_', $logstart, 'due to wal_level');
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+$handle = make_slot_active($node_standby, 'wal_level_', 0, \$stdout, \$stderr);
+# We are not able to read from the slot as it requires wal_level at least logical on the primary server
+check_pg_recvlogical_stderr($handle, "logical decoding on standby requires wal_level to be at least logical on the primary server");
+
+# Restore primary wal_level
+$node_primary->append_conf('postgresql.conf',q[
+wal_level = 'logical'
+]);
+$node_primary->restart;
+$node_primary->wait_for_replay_catchup($node_standby);
+
+$handle = make_slot_active($node_standby, 'wal_level_', 0, \$stdout, \$stderr);
+# as the slot has been invalidated we should not be able to read
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication slot \"wal_level_activeslot\"");
+
+##################################################
+# DROP DATABASE should drops it's slots, including active slots.
+##################################################
+
+# drop the logical slots
+drop_logical_slots('wal_level_');
+
+# create the logical slots
+create_logical_slots($node_standby, 'drop_db_');
+
+$handle = make_slot_active($node_standby, 'drop_db_', 1, \$stdout, \$stderr);
+
+# Create a slot on a database that would not be dropped. This slot should not
+# get dropped.
+$node_standby->create_logical_slot_on_standby($node_primary, 'otherslot', 'postgres');
+
+# dropdb on the primary to verify slots are dropped on standby
+$node_primary->safe_psql('postgres', q[DROP DATABASE testdb]);
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+is($node_standby->safe_psql('postgres',
+	q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb')]), 'f',
+	'database dropped on standby');
+
+check_slots_dropped('drop_db', $handle);
+
+is($node_standby->slot('otherslot')->{'slot_type'}, 'logical',
+	'otherslot on standby not dropped');
+
+# Cleanup : manually drop the slot that was not dropped.
+$node_standby->psql('postgres', q[SELECT pg_drop_replication_slot('otherslot')]);
+
+##################################################
+# Test standby promotion and logical decoding behavior
+# after the standby gets promoted.
+##################################################
+
+$node_standby->reload;
+
+$node_primary->psql('postgres', q[CREATE DATABASE testdb]);
+$node_primary->safe_psql('testdb', qq[CREATE TABLE decoding_test(x integer, y text);]);
+
+# create the logical slots
+create_logical_slots($node_standby, 'promotion_');
+
+# create the logical slots on the cascading standby too
+create_logical_slots($node_cascading_standby, 'promotion_');
+
+# Make slots actives
+$handle = make_slot_active($node_standby, 'promotion_', 1, \$stdout, \$stderr);
+my $cascading_handle = make_slot_active($node_cascading_standby, 'promotion_', 1, \$cascading_stdout, \$cascading_stderr);
+
+# Insert some rows before the promotion
+$node_primary->safe_psql('testdb',
+	qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;]
+);
+
+# Wait for both standbys to catchup
+$node_primary->wait_for_replay_catchup($node_standby);
+$node_standby->wait_for_replay_catchup($node_cascading_standby, $node_primary);
+
+# promote
+$node_standby->promote;
+
+# insert some rows on promoted standby
+$node_standby->safe_psql('testdb',
+	qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(5,7) s;]
+);
+
+# Wait for the cascading standby to catchup
+$node_standby->wait_for_replay_catchup($node_cascading_standby);
+
+$expected = q{BEGIN
+table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
+table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
+table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
+table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
+COMMIT
+BEGIN
+table public.decoding_test: INSERT: x[integer]:5 y[text]:'5'
+table public.decoding_test: INSERT: x[integer]:6 y[text]:'6'
+table public.decoding_test: INSERT: x[integer]:7 y[text]:'7'
+COMMIT};
+
+# check that we are decoding pre and post promotion inserted rows
+$stdout_sql = $node_standby->safe_psql('testdb',
+	qq[SELECT data FROM pg_logical_slot_peek_changes('promotion_inactiveslot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
+);
+
+is($stdout_sql, $expected, 'got expected output from SQL decoding session on promoted standby');
+
+# check that we are decoding pre and post promotion inserted rows
+# with pg_recvlogical that has started before the promotion
+my $pump_timeout = IPC::Run::timer($PostgreSQL::Test::Utils::timeout_default);
+
+ok( pump_until(
+        $handle, $pump_timeout, \$stdout, qr/^.*COMMIT.*COMMIT$/s),
+    'got 2 COMMIT from pg_recvlogical output');
+
+chomp($stdout);
+is($stdout, $expected,
+    'got same expected output from pg_recvlogical decoding session');
+
+# check that we are decoding pre and post promotion inserted rows on the cascading standby
+$stdout_sql = $node_cascading_standby->safe_psql('testdb',
+	qq[SELECT data FROM pg_logical_slot_peek_changes('promotion_inactiveslot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
+);
+
+is($stdout_sql, $expected, 'got expected output from SQL decoding session on cascading standby');
+
+# check that we are decoding pre and post promotion inserted rows
+# with pg_recvlogical that has started before the promotion on the cascading standby
+ok( pump_until(
+        $cascading_handle, $pump_timeout, \$cascading_stdout, qr/^.*COMMIT.*COMMIT$/s),
+    'got 2 COMMIT from pg_recvlogical output');
+
+chomp($cascading_stdout);
+is($cascading_stdout, $expected,
+    'got same expected output from pg_recvlogical decoding session on cascading standby');
+
+done_testing();
-- 
2.38.0

>From 14ebbf66cc134aa08036a02dc23482adff74fb3e Mon Sep 17 00:00:00 2001
From: bdrouvotAWS <bdrou...@amazon.com>
Date: Tue, 7 Feb 2023 14:08:11 +0000
Subject: [PATCH va67 9/9] Doc changes describing details about logical
 decoding.

Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes Mello
---
 doc/src/sgml/logicaldecoding.sgml | 27 +++++++++++++++++++++++++++
 1 file changed, 27 insertions(+)

diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 4e912b4bd48..8651024b8a6 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -316,6 +316,33 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
      may consume changes from a slot at any given time.
     </para>
 
+    <para>
+     A logical replication slot can also be created on a hot standby. To prevent
+     <command>VACUUM</command> from removing required rows from the system
+     catalogs, <varname>hot_standby_feedback</varname> should be set on the
+     standby. In spite of that, if any required rows get removed, the slot gets
+     invalidated. It's highly recommended to use a physical slot between the primary
+     and the standby. Otherwise, hot_standby_feedback will work, but only while the
+     connection is alive (for example a node restart would break it). Then, the
+     primary may delete system catalog rows that could be needed by the logical
+     decoding on the standby (as it does not know about the catalog_xmin on the
+     standby). Existing logical slots on standby also get invalidated if wal_level
+     on primary is reduced to less than 'logical'. This is done as soon as the
+     standby detects such a change in the WAL stream. It means, that for walsenders
+     that are lagging (if any), some WAL records up to the wal_level parameter change
+     on the primary won't be decoded.
+    </para>
+
+    <para>
+     For a logical slot to be created, it builds a historic snapshot, for which
+     information of all the currently running transactions is essential. On
+     primary, this information is available, but on standby, this information
+     has to be obtained from primary. So, slot creation may wait for some
+     activity to happen on the primary. If the primary is idle, creating a
+     logical slot on standby may take a noticeable time. One option to speed it
+     is to call the <function>pg_log_standby_snapshot</function> on the primary.
+    </para>
+
     <caution>
      <para>
       Replication slots persist across crashes and know nothing about the state
-- 
2.38.0

Reply via email to