Hi,

On 20 September 2018 at 08:18, Kyotaro HORIGUCHI
<horiguchi.kyot...@lab.ntt.co.jp> wrote:

>
> Instaed, we can iterally "reserve" connection slots for the
> specific use by providing ProcGlobal->walsenderFreeProcs. The
> slots are never stolen for other usage. Allowing additional
> walsenders comes after all the slots are filled to grab an
> available "normal" slot, it works as the same as the current
> behavior when walsender_reserved_connectsions = 0.
>
> What do you think about this?

Sounds reasonable, please see the updated patch.

Regards,
--
Alexander Kukushkin
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index e1073ac6d3..ccdb217735 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -3059,6 +3059,27 @@ include_dir 'conf.d'
        </listitem>
       </varlistentry>
 
+     <varlistentry id="guc-replication-reserved-connections"
+     xreflabel="replication_reserved_connections">
+      <term><varname>replication_reserved_connections</varname>
+      (<type>integer</type>)
+      <indexterm>
+       <primary><varname>replication_reserved_connections</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Determines the number of connection <quote>slots</quote> that
+        are reserved for replication connections.
+       </para>
+
+       <para>
+        The default value is zero. The value should not exceed <varname>max_wal_senders</varname>.
+        This parameter can only be set at server start.
+       </para>
+      </listitem>
+     </varlistentry>
+
       <varlistentry id="guc-max-replication-slots" xreflabel="max_replication_slots">
        <term><varname>max_replication_slots</varname> (<type>integer</type>)
        <indexterm>
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 370429d746..40b9deaa0c 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -122,6 +122,8 @@ int			max_wal_senders = 0;	/* the maximum number of concurrent
 int			wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
 											 * data message */
 bool		log_replication_commands = false;
+int			replication_reserved_connections = 0; /* the number of connection slots
+												   * reserved for replication connections */
 
 /*
  * State for WalSndWakeupRequest
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 6f9aaa52fa..2d04a8204a 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -43,6 +43,7 @@
 #include "postmaster/autovacuum.h"
 #include "replication/slot.h"
 #include "replication/syncrep.h"
+#include "replication/walsender.h"
 #include "storage/condition_variable.h"
 #include "storage/standby.h"
 #include "storage/ipc.h"
@@ -180,6 +181,7 @@ InitProcGlobal(void)
 	ProcGlobal->freeProcs = NULL;
 	ProcGlobal->autovacFreeProcs = NULL;
 	ProcGlobal->bgworkerFreeProcs = NULL;
+	ProcGlobal->walsenderFreeProcs = NULL;
 	ProcGlobal->startupProc = NULL;
 	ProcGlobal->startupProcPid = 0;
 	ProcGlobal->startupBufferPinWaitBufId = -1;
@@ -253,13 +255,20 @@ InitProcGlobal(void)
 			ProcGlobal->autovacFreeProcs = &procs[i];
 			procs[i].procgloballist = &ProcGlobal->autovacFreeProcs;
 		}
-		else if (i < MaxBackends)
+		else if (i < MaxBackends - replication_reserved_connections)
 		{
 			/* PGPROC for bgworker, add to bgworkerFreeProcs list */
 			procs[i].links.next = (SHM_QUEUE *) ProcGlobal->bgworkerFreeProcs;
 			ProcGlobal->bgworkerFreeProcs = &procs[i];
 			procs[i].procgloballist = &ProcGlobal->bgworkerFreeProcs;
 		}
+		else if (i < MaxBackends)
+		{
+			/* PGPROC for walsender, add to walsenderFreeProcs list */
+			procs[i].links.next = (SHM_QUEUE *) ProcGlobal->walsenderFreeProcs;
+			ProcGlobal->walsenderFreeProcs = &procs[i];
+			procs[i].procgloballist = &ProcGlobal->walsenderFreeProcs;
+		}
 
 		/* Initialize myProcLocks[] shared memory queues. */
 		for (j = 0; j < NUM_LOCK_PARTITIONS; j++)
@@ -304,6 +313,8 @@ InitProcess(void)
 		procgloballist = &ProcGlobal->autovacFreeProcs;
 	else if (IsBackgroundWorker)
 		procgloballist = &ProcGlobal->bgworkerFreeProcs;
+	else if (am_walsender)
+		procgloballist = &ProcGlobal->walsenderFreeProcs;
 	else
 		procgloballist = &ProcGlobal->freeProcs;
 
@@ -318,6 +329,14 @@ InitProcess(void)
 
 	set_spins_per_delay(ProcGlobal->spins_per_delay);
 
+	/*
+	* Try to use ProcGlobal->freeProcs as a fallback when
+	* all reserved walsender slots are already busy.
+	*/
+	if (am_walsender && replication_reserved_connections < max_wal_senders
+			&& *procgloballist == NULL)
+		procgloballist = &ProcGlobal->freeProcs;
+
 	MyProc = *procgloballist;
 
 	if (MyProc != NULL)
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index 5ef6315d20..7de2609ef2 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -505,7 +505,7 @@ InitializeMaxBackends(void)
 
 	/* the extra unit accounts for the autovacuum launcher */
 	MaxBackends = MaxConnections + autovacuum_max_workers + 1 +
-		max_worker_processes;
+		max_worker_processes + replication_reserved_connections;
 
 	/* internal error because the values were all checked previously */
 	if (MaxBackends > MAX_BACKENDS)
@@ -790,8 +790,8 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
 
 	/*
 	 * The last few connection slots are reserved for superusers.  Although
-	 * replication connections currently require superuser privileges, we
-	 * don't allow them to consume the reserved slots, which are intended for
+	 * replication connections may have superuser privileges, we don't
+	 * allow them to consume the reserved slots, which are intended for
 	 * interactive use.
 	 */
 	if ((!am_superuser || am_walsender) &&
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 77662aff7f..5203b70ef6 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2527,6 +2527,17 @@ static struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		/* see max_connections, max_wal_senders and superuser_reserved_connections */
+		{"replication_reserved_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
+			gettext_noop("Sets the number of connection slots reserved for replication connections."),
+			NULL
+		},
+		&replication_reserved_connections,
+		0, 0, MAX_BACKENDS,
+		NULL, NULL, NULL
+	},
+
 	{
 		/* see max_wal_senders */
 		{"max_replication_slots", PGC_POSTMASTER, REPLICATION_SENDING,
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 4e61bc6521..6713833c5b 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -237,6 +237,8 @@
 
 #max_wal_senders = 10		# max number of walsender processes
 				# (change requires restart)
+#replication_reserved_connections = 0	# number of connection slots reserved
+				# for replication connections. (change requires restart)
 #wal_keep_segments = 0		# in logfile segments; 0 disables
 #wal_sender_timeout = 60s	# in milliseconds; 0 disables
 
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index 45b72a76db..9c9fbadfa0 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -36,6 +36,7 @@ extern bool wake_wal_senders;
 extern int	max_wal_senders;
 extern int	wal_sender_timeout;
 extern bool log_replication_commands;
+extern int	replication_reserved_connections;
 
 extern void InitWalSender(void);
 extern bool exec_replication_command(const char *query_string);
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index cb613c8076..e89f7337b4 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -255,6 +255,8 @@ typedef struct PROC_HDR
 	PGPROC	   *autovacFreeProcs;
 	/* Head of list of bgworker free PGPROC structures */
 	PGPROC	   *bgworkerFreeProcs;
+	/* Head of list of walsender free PGPROC structures */
+	PGPROC	   *walsenderFreeProcs;
 	/* First pgproc waiting for group XID clear */
 	pg_atomic_uint32 procArrayGroupFirst;
 	/* First pgproc waiting for group transaction status update */

Reply via email to