commit 65258e95db0c461a9b48df17e7be01312a8489ab
Author: Paul Julius Martinez <paulmtz@google.com>
Date:   Thu Feb 18 21:55:02 2021 +0000

    Add max_replication_origins GUC parameter to replace some usages of max_replication_slots.

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index e81141e45c..07a6fd632c 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4693,6 +4693,25 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
 
      <variablelist>
 
+     <varlistentry id="guc-max-replication-origins" xreflabel="max_replication_origins">
+      <term><varname>max_replication_origins</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>max_replication_origins</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Specifies how many replication origins (see <xref linkend="replication-origins"/>) can
+        be tracked simultaneously, effectively limiting how many logical replication subscriptions
+        can be created on the server. Setting it a lower value than the current
+        number of tracked replication origins (reflected in
+        <link linkend="view-pg-replication-origin-status">pg_replication_origin_status</link>,
+        not <link linkend="catalog-pg-replication-origin">pg_replication_origin</link>)
+        will prevent the server from starting.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-max-logical-replication-workers" xreflabel="max_logical_replication_workers">
       <term><varname>max_logical_replication_workers</varname> (<type>int</type>)
       <indexterm>
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index d0742f2c52..9b754a4d17 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -602,7 +602,7 @@
   </para>
 
   <para>
-   The subscriber also requires the <varname>max_replication_slots</varname>
+   The subscriber also requires the <varname>max_replication_origins</varname>
    to be set.  In this case it should be set to at least the number of
    subscriptions that will be added to the subscriber.
    <varname>max_logical_replication_workers</varname> must be set to at
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index cb462a052a..efec1e0115 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -32,6 +32,7 @@
 #include "postmaster/fork_process.h"
 #include "postmaster/interrupt.h"
 #include "postmaster/postmaster.h"
+#include "replication/logical.h"
 #include "replication/logicallauncher.h"
 #include "replication/logicalworker.h"
 #include "replication/slot.h"
@@ -286,10 +287,10 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
 					subname)));
 
 	/* Report this after the initial starting message for consistency. */
-	if (max_replication_slots == 0)
+	if (max_replication_origins == 0)
 		ereport(ERROR,
 				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
-				 errmsg("cannot start logical replication workers when max_replication_slots = 0")));
+				 errmsg("cannot start logical replication workers when max_replication_origins = 0")));
 
 	/*
 	 * We need to do the modification of the shared memory under lock so that
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 685eaa6134..3c6f0d9826 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -146,7 +146,7 @@ typedef struct ReplicationStateCtl
 {
 	/* Tranche to use for per-origin LWLocks */
 	int			tranche_id;
-	/* Array of length max_replication_slots */
+	/* Array of length max_replication_origins */
 	ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
 } ReplicationStateCtl;
 
@@ -155,12 +155,13 @@ RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity
 XLogRecPtr	replorigin_session_origin_lsn = InvalidXLogRecPtr;
 TimestampTz replorigin_session_origin_timestamp = 0;
 
+/* GUCs */
+/* The maximum number of replication origins that can be tracked simultaneously. */
+int	max_replication_origins = 0;
+
 /*
  * Base address into a shared memory array of replication states of size
- * max_replication_slots.
- *
- * XXX: Should we use a separate variable to size this rather than
- * max_replication_slots?
+ * max_replication_origins.
  */
 static ReplicationState *replication_states;
 
@@ -182,10 +183,10 @@ static ReplicationState *session_replication_state = NULL;
 static void
 replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
 {
-	if (check_slots && max_replication_slots == 0)
+	if (check_slots && max_replication_origins == 0)
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-				 errmsg("cannot query or manipulate replication origin when max_replication_slots = 0")));
+				 errmsg("cannot query or manipulate replication origin when max_replication_origins = 0")));
 
 	if (!recoveryOK && RecoveryInProgress())
 		ereport(ERROR,
@@ -338,7 +339,7 @@ restart:
 	tuple = NULL;
 	LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
 
-	for (i = 0; i < max_replication_slots; i++)
+	for (i = 0; i < max_replication_origins; i++)
 	{
 		ReplicationState *state = &replication_states[i];
 
@@ -491,18 +492,13 @@ ReplicationOriginShmemSize(void)
 {
 	Size		size = 0;
 
-	/*
-	 * XXX: max_replication_slots is arguably the wrong thing to use, as here
-	 * we keep the replay state of *remote* transactions. But for now it seems
-	 * sufficient to reuse it, rather than introduce a separate GUC.
-	 */
-	if (max_replication_slots == 0)
+	if (max_replication_origins == 0)
 		return size;
 
 	size = add_size(size, offsetof(ReplicationStateCtl, states));
 
 	size = add_size(size,
-					mul_size(max_replication_slots, sizeof(ReplicationState)));
+					mul_size(max_replication_origins, sizeof(ReplicationState)));
 	return size;
 }
 
@@ -511,7 +507,7 @@ ReplicationOriginShmemInit(void)
 {
 	bool		found;
 
-	if (max_replication_slots == 0)
+	if (max_replication_origins == 0)
 		return;
 
 	replication_states_ctl = (ReplicationStateCtl *)
@@ -528,7 +524,7 @@ ReplicationOriginShmemInit(void)
 
 		replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
 
-		for (i = 0; i < max_replication_slots; i++)
+		for (i = 0; i < max_replication_origins; i++)
 		{
 			LWLockInitialize(&replication_states[i].lock,
 							 replication_states_ctl->tranche_id);
@@ -550,7 +546,7 @@ ReplicationOriginShmemInit(void)
  *
  * So its just the magic, followed by the statically sized
  * ReplicationStateOnDisk structs. Note that the maximum number of
- * ReplicationState is determined by max_replication_slots.
+ * ReplicationState is determined by max_replication_origins.
  * ---------------------------------------------------------------------------
  */
 void
@@ -563,7 +559,7 @@ CheckPointReplicationOrigin(void)
 	uint32		magic = REPLICATION_STATE_MAGIC;
 	pg_crc32c	crc;
 
-	if (max_replication_slots == 0)
+	if (max_replication_origins == 0)
 		return;
 
 	INIT_CRC32C(crc);
@@ -605,7 +601,7 @@ CheckPointReplicationOrigin(void)
 	LWLockAcquire(ReplicationOriginLock, LW_SHARED);
 
 	/* write actual data */
-	for (i = 0; i < max_replication_slots; i++)
+	for (i = 0; i < max_replication_origins; i++)
 	{
 		ReplicationStateOnDisk disk_state;
 		ReplicationState *curstate = &replication_states[i];
@@ -698,7 +694,7 @@ StartupReplicationOrigin(void)
 	already_started = true;
 #endif
 
-	if (max_replication_slots == 0)
+	if (max_replication_origins == 0)
 		return;
 
 	INIT_CRC32C(crc);
@@ -708,7 +704,7 @@ StartupReplicationOrigin(void)
 	fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
 
 	/*
-	 * might have had max_replication_slots == 0 last run, or we just brought
+	 * might have had max_replication_origins == 0 last run, or we just brought
 	 * up a standby.
 	 */
 	if (fd < 0 && errno == ENOENT)
@@ -776,10 +772,10 @@ StartupReplicationOrigin(void)
 
 		COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
 
-		if (last_state == max_replication_slots)
+		if (last_state == max_replication_origins)
 			ereport(PANIC,
 					(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
-					 errmsg("could not find free replication state, increase max_replication_slots")));
+					 errmsg("could not find free replication state, increase max_replication_origins")));
 
 		/* copy data to shared memory */
 		replication_states[last_state].roident = disk_state.roident;
@@ -833,7 +829,7 @@ replorigin_redo(XLogReaderState *record)
 
 				xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
 
-				for (i = 0; i < max_replication_slots; i++)
+				for (i = 0; i < max_replication_origins; i++)
 				{
 					ReplicationState *state = &replication_states[i];
 
@@ -898,7 +894,7 @@ replorigin_advance(RepOriginId node,
 	 * Search for either an existing slot for the origin, or a free one we can
 	 * use.
 	 */
-	for (i = 0; i < max_replication_slots; i++)
+	for (i = 0; i < max_replication_origins; i++)
 	{
 		ReplicationState *curstate = &replication_states[i];
 
@@ -939,7 +935,7 @@ replorigin_advance(RepOriginId node,
 				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
 				 errmsg("could not find free replication state slot for replication origin with OID %u",
 						node),
-				 errhint("Increase max_replication_slots and try again.")));
+				 errhint("Increase max_replication_origins and try again.")));
 
 	if (replication_state == NULL)
 	{
@@ -1002,7 +998,7 @@ replorigin_get_progress(RepOriginId node, bool flush)
 	/* prevent slots from being concurrently dropped */
 	LWLockAcquire(ReplicationOriginLock, LW_SHARED);
 
-	for (i = 0; i < max_replication_slots; i++)
+	for (i = 0; i < max_replication_origins; i++)
 	{
 		ReplicationState *state;
 
@@ -1078,7 +1074,7 @@ replorigin_session_setup(RepOriginId node)
 		registered_cleanup = true;
 	}
 
-	Assert(max_replication_slots > 0);
+	Assert(max_replication_origins > 0);
 
 	if (session_replication_state != NULL)
 		ereport(ERROR,
@@ -1092,7 +1088,7 @@ replorigin_session_setup(RepOriginId node)
 	 * Search for either an existing slot for the origin, or a free one we can
 	 * use.
 	 */
-	for (i = 0; i < max_replication_slots; i++)
+	for (i = 0; i < max_replication_origins; i++)
 	{
 		ReplicationState *curstate = &replication_states[i];
 
@@ -1126,7 +1122,7 @@ replorigin_session_setup(RepOriginId node)
 				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
 				 errmsg("could not find free replication state slot for replication origin with OID %u",
 						node),
-				 errhint("Increase max_replication_slots and try again.")));
+				 errhint("Increase max_replication_origins and try again.")));
 	else if (session_replication_state == NULL)
 	{
 		/* initialize new slot */
@@ -1158,7 +1154,7 @@ replorigin_session_reset(void)
 {
 	ConditionVariable *cv;
 
-	Assert(max_replication_slots != 0);
+	Assert(max_replication_origins != 0);
 
 	if (session_replication_state == NULL)
 		ereport(ERROR,
@@ -1522,7 +1518,7 @@ pg_show_replication_origin_status(PG_FUNCTION_ARGS)
 	 * filled. Note that we do not take any locks, so slightly corrupted/out
 	 * of date values are a possibility.
 	 */
-	for (i = 0; i < max_replication_slots; i++)
+	for (i = 0; i < max_replication_origins; i++)
 	{
 		ReplicationState *state;
 		Datum		values[REPLICATION_ORIGIN_PROGRESS_COLS];
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 00018abb7d..9436057332 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -69,6 +69,7 @@
 #include "postmaster/postmaster.h"
 #include "postmaster/syslogger.h"
 #include "postmaster/walwriter.h"
+#include "replication/logical.h"
 #include "replication/logicallauncher.h"
 #include "replication/reorderbuffer.h"
 #include "replication/slot.h"
@@ -3005,6 +3006,16 @@ static struct config_int ConfigureNamesInt[] =
 		check_max_worker_processes, NULL, NULL
 	},
 
+	{
+		{"max_replication_origins", PGC_POSTMASTER, REPLICATION_SUBSCRIBERS,
+			gettext_noop("Sets the maximum number of replication origins that can be tracked simultaneously."),
+			NULL
+		},
+		&max_replication_origins,
+		4, 0, MAX_BACKENDS,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"max_logical_replication_workers",
 			PGC_POSTMASTER,
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index c253403372..3512651c04 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -14,6 +14,9 @@
 #include "replication/output_plugin.h"
 #include "replication/slot.h"
 
+/* GUCs */
+extern PGDLLIMPORT int max_replication_origins;
+
 struct LogicalDecodingContext;
 
 typedef void (*LogicalOutputPluginWriterWrite) (struct LogicalDecodingContext *lr,
