From 02ea832d505ee05b701c2e255a9f084a0b765624 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Fri, 7 Jan 2022 03:16:51 +0000
Subject: [PATCH v2] deduplicate min restart_lsn calculation code

---
 src/backend/access/heap/rewriteheap.c       |  2 +-
 src/backend/replication/logical/logical.c   |  5 +-
 src/backend/replication/logical/snapbuild.c |  2 +-
 src/backend/replication/slot.c              | 89 ++++++---------------
 src/backend/replication/slotfuncs.c         |  8 +-
 src/backend/replication/walsender.c         |  5 +-
 src/include/replication/slot.h              |  3 +-
 7 files changed, 43 insertions(+), 71 deletions(-)

diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index 986a776bbd..7265ac0652 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -1204,7 +1204,7 @@ CheckPointLogicalRewriteHeap(void)
 	redo = GetRedoRecPtr();
 
 	/* now check for the restart ptrs from existing slots */
-	cutoff = ReplicationSlotsComputeLogicalRestartLSN();
+	cutoff = ReplicationSlotsComputeRequiredLSN(true);
 
 	/* don't start earlier than the restart lsn */
 	if (cutoff != InvalidXLogRecPtr && redo < cutoff)
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 10cbdea124..f61378fa2f 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1771,12 +1771,15 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 		 */
 		if (updated_xmin)
 		{
+			XLogRecPtr	min_required;
+
 			SpinLockAcquire(&MyReplicationSlot->mutex);
 			MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin;
 			SpinLockRelease(&MyReplicationSlot->mutex);
 
 			ReplicationSlotsComputeRequiredXmin(false);
-			ReplicationSlotsComputeRequiredLSN();
+			min_required = ReplicationSlotsComputeRequiredLSN(false);
+			XLogSetReplicationSlotMinimumLSN(min_required);
 		}
 	}
 	else
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index dbdc172a2b..a0cb09836e 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -1935,7 +1935,7 @@ CheckPointSnapBuild(void)
 	redo = GetRedoRecPtr();
 
 	/* now check for the restart ptrs from existing slots */
-	cutoff = ReplicationSlotsComputeLogicalRestartLSN();
+	cutoff = ReplicationSlotsComputeRequiredLSN(true);
 
 	/* don't start earlier than the restart lsn */
 	if (redo < cutoff)
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 90ba9b417d..d299bbf347 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -594,6 +594,7 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
 {
 	char		path[MAXPGPATH];
 	char		tmppath[MAXPGPATH];
+	XLogRecPtr	min_required;
 
 	/*
 	 * If some other backend ran this code concurrently with us, we might try
@@ -665,7 +666,8 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
 	 * limits.
 	 */
 	ReplicationSlotsComputeRequiredXmin(false);
-	ReplicationSlotsComputeRequiredLSN();
+	min_required = ReplicationSlotsComputeRequiredLSN(false);
+	XLogSetReplicationSlotMinimumLSN(min_required);
 
 	/*
 	 * If removing the directory fails, the worst thing that will happen is
@@ -807,82 +809,37 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
 }
 
 /*
- * Compute the oldest restart LSN across all slots and inform xlog module.
+ * Compute the oldest restart LSN of replication slots
+ *
+ * When only_logical is true, compute for logical decoding slots only.
  *
  * Note: while max_slot_wal_keep_size is theoretically relevant for this
  * purpose, we don't try to account for that, because this module doesn't
  * know what to compare against.
  */
-void
-ReplicationSlotsComputeRequiredLSN(void)
-{
-	int			i;
-	XLogRecPtr	min_required = InvalidXLogRecPtr;
-
-	Assert(ReplicationSlotCtl != NULL);
-
-	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-	for (i = 0; i < max_replication_slots; i++)
-	{
-		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
-		XLogRecPtr	restart_lsn;
-
-		if (!s->in_use)
-			continue;
-
-		SpinLockAcquire(&s->mutex);
-		restart_lsn = s->data.restart_lsn;
-		SpinLockRelease(&s->mutex);
-
-		if (restart_lsn != InvalidXLogRecPtr &&
-			(min_required == InvalidXLogRecPtr ||
-			 restart_lsn < min_required))
-			min_required = restart_lsn;
-	}
-	LWLockRelease(ReplicationSlotControlLock);
-
-	XLogSetReplicationSlotMinimumLSN(min_required);
-}
-
-/*
- * Compute the oldest WAL LSN required by *logical* decoding slots..
- *
- * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
- * slots exist.
- *
- * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
- * ignores physical replication slots.
- *
- * The results aren't required frequently, so we don't maintain a precomputed
- * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
- */
 XLogRecPtr
-ReplicationSlotsComputeLogicalRestartLSN(void)
+ReplicationSlotsComputeRequiredLSN(bool only_logical)
 {
-	XLogRecPtr	result = InvalidXLogRecPtr;
 	int			i;
+	XLogRecPtr	min_required = InvalidXLogRecPtr;
 
 	if (max_replication_slots <= 0)
 		return InvalidXLogRecPtr;
 
-	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	Assert(ReplicationSlotCtl != NULL);
 
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 	for (i = 0; i < max_replication_slots; i++)
 	{
-		ReplicationSlot *s;
+		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
 		XLogRecPtr	restart_lsn;
 
-		s = &ReplicationSlotCtl->replication_slots[i];
-
-		/* cannot change while ReplicationSlotCtlLock is held */
 		if (!s->in_use)
 			continue;
 
-		/* we're only interested in logical slots */
-		if (!SlotIsLogical(s))
+		if (only_logical && !SlotIsLogical(s))
 			continue;
 
-		/* read once, it's ok if it increases while we're checking */
 		SpinLockAcquire(&s->mutex);
 		restart_lsn = s->data.restart_lsn;
 		SpinLockRelease(&s->mutex);
@@ -890,14 +847,13 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
 		if (restart_lsn == InvalidXLogRecPtr)
 			continue;
 
-		if (result == InvalidXLogRecPtr ||
-			restart_lsn < result)
-			result = restart_lsn;
+		if (min_required == InvalidXLogRecPtr ||
+			restart_lsn < min_required)
+			min_required = restart_lsn;
 	}
-
 	LWLockRelease(ReplicationSlotControlLock);
 
-	return result;
+	return min_required;
 }
 
 /*
@@ -1096,6 +1052,7 @@ ReplicationSlotReserveWal(void)
 	{
 		XLogSegNo	segno;
 		XLogRecPtr	restart_lsn;
+		XLogRecPtr	min_required;
 
 		/*
 		 * For logical slots log a standby snapshot and start logical decoding
@@ -1133,7 +1090,8 @@ ReplicationSlotReserveWal(void)
 		}
 
 		/* prevent WAL removal as fast as possible */
-		ReplicationSlotsComputeRequiredLSN();
+		min_required = ReplicationSlotsComputeRequiredLSN(false);
+		XLogSetReplicationSlotMinimumLSN(min_required);
 
 		/*
 		 * If all required WAL is still there, great, otherwise retry. The
@@ -1343,8 +1301,11 @@ restart:
 	 */
 	if (invalidated)
 	{
+		XLogRecPtr	min_required;
+
 		ReplicationSlotsComputeRequiredXmin(false);
-		ReplicationSlotsComputeRequiredLSN();
+		min_required = ReplicationSlotsComputeRequiredLSN(false);
+		XLogSetReplicationSlotMinimumLSN(min_required);
 	}
 
 	return invalidated;
@@ -1396,6 +1357,7 @@ StartupReplicationSlots(void)
 {
 	DIR		   *replication_dir;
 	struct dirent *replication_de;
+	XLogRecPtr	min_required;
 
 	elog(DEBUG1, "starting up replication slots");
 
@@ -1441,7 +1403,8 @@ StartupReplicationSlots(void)
 
 	/* Now that we have recovered all the data, compute replication xmin */
 	ReplicationSlotsComputeRequiredXmin(false);
-	ReplicationSlotsComputeRequiredLSN();
+	min_required = ReplicationSlotsComputeRequiredLSN(false);
+	XLogSetReplicationSlotMinimumLSN(min_required);
 }
 
 /* ----
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index d11daeb1fc..5def594640 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -608,6 +608,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 	bool		nulls[2];
 	HeapTuple	tuple;
 	Datum		result;
+	XLogRecPtr	min_required;
 
 	Assert(!MyReplicationSlot);
 
@@ -672,7 +673,8 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 	 * advancing potentially done.
 	 */
 	ReplicationSlotsComputeRequiredXmin(false);
-	ReplicationSlotsComputeRequiredLSN();
+	min_required = ReplicationSlotsComputeRequiredLSN(false);
+	XLogSetReplicationSlotMinimumLSN(min_required);
 
 	ReplicationSlotRelease();
 
@@ -816,6 +818,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 		XLogRecPtr	copy_confirmed_flush;
 		bool		copy_islogical;
 		char	   *copy_name;
+		XLogRecPtr	min_required;
 
 		/* Copy data of source slot again */
 		SpinLockAcquire(&src->mutex);
@@ -873,7 +876,8 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 
 		ReplicationSlotMarkDirty();
 		ReplicationSlotsComputeRequiredXmin(false);
-		ReplicationSlotsComputeRequiredLSN();
+		min_required = ReplicationSlotsComputeRequiredLSN(false);
+		XLogSetReplicationSlotMinimumLSN(min_required);
 		ReplicationSlotSave();
 
 #ifdef USE_ASSERT_CHECKING
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 84915ed95b..cf0a65d975 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1977,8 +1977,11 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
 
 	if (changed)
 	{
+		XLogRecPtr	min_required;
+
 		ReplicationSlotMarkDirty();
-		ReplicationSlotsComputeRequiredLSN();
+		min_required = ReplicationSlotsComputeRequiredLSN(false);
+		XLogSetReplicationSlotMinimumLSN(min_required);
 	}
 
 	/*
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 53d773ccff..53ae2b3607 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -209,8 +209,7 @@ extern void ReplicationSlotMarkDirty(void);
 extern bool ReplicationSlotValidateName(const char *name, int elevel);
 extern void ReplicationSlotReserveWal(void);
 extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
-extern void ReplicationSlotsComputeRequiredLSN(void);
-extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
+extern XLogRecPtr ReplicationSlotsComputeRequiredLSN(bool only_logical);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
 extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
-- 
2.25.1

