On 03/10/2016 21:27, Robert Haas wrote:
> On Fri, Sep 30, 2016 at 12:23 PM, Julien Rouhaud
> <julien.rouh...@dalibo.com> wrote:
>> I've already fixed every other issues mentioned upthread, but I'm facing
>> a problem for this one.  Assuming that the bgworker classes are supposed
>> to be mutually exclusive, I don't see a simple and clean way to add such
>> a check in SanityCheckBackgroundWorker().  Am I missing something
>> obvious, or can someone give me some advice for this?
> 
> My advice is "don't worry about it".   If somebody thinks of something
> that can be usefully added there, it will take very little time to add
> it and test that it works.  Don't get hung up on that for now.
> 

Ok, thanks!

Please find attached v9 of the patch, adding the parallel worker class
and changing max_worker_processes default to 16 and max_parallel_workers
to 8.  I also added Amit's explanation for the need of a write barrier
in ForgetBackgroundWorker().

I'll add this patch to the next commitfest.

-- 
Julien Rouhaud
http://dalibo.com - http://dalibo.org
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index e826c19..61c5a7c 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -1984,7 +1984,7 @@ include_dir 'conf.d'
         <para>
          Sets the maximum number of background processes that the system
          can support.  This parameter can only be set at server start.  The
-         default is 8.
+         default is 16.
         </para>
 
         <para>
@@ -2006,8 +2006,9 @@ include_dir 'conf.d'
          Sets the maximum number of workers that can be started by a single
          <literal>Gather</literal> node.  Parallel workers are taken from the
          pool of processes established by
-         <xref linkend="guc-max-worker-processes">.  Note that the requested
-         number of workers may not actually be available at run time.  If this
+         <xref linkend="guc-max-worker-processes">, limited by
+         <xref linkend="guc-max-parallel-workers">.  Note that the requested
+         number of workers may not actually be available at runtime.  If this
          occurs, the plan will run with fewer workers than expected, which may
          be inefficient.  The default value is 2.  Setting this value to 0
          disables parallel query execution.
@@ -2036,6 +2037,21 @@ include_dir 'conf.d'
        </listitem>
       </varlistentry>
 
+      <varlistentry id="guc-max-parallel-workers" xreflabel="max_parallel_workers">
+       <term><varname>max_parallel_workers</varname> (<type>integer</type>)
+       <indexterm>
+        <primary><varname>max_parallel_workers</> configuration parameter</primary>
+       </indexterm>
+       </term>
+       <listitem>
+        <para>
+         Sets the maximum number of workers that the system can support for
+         parallel queries.  The default value is 8.  Setting this value to 0
+         disables parallel query execution.
+        </para>
+       </listitem>
+      </varlistentry>
+
       <varlistentry id="guc-backend-flush-after" xreflabel="backend_flush_after">
        <term><varname>backend_flush_after</varname> (<type>integer</type>)
        <indexterm>
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index cde0ed3..0177401 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -453,7 +453,8 @@ LaunchParallelWorkers(ParallelContext *pcxt)
 	snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
 			 MyProcPid);
 	worker.bgw_flags =
-		BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
+		BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION
+		| BGWORKER_CLASS_PARALLEL;
 	worker.bgw_start_time = BgWorkerStart_ConsistentState;
 	worker.bgw_restart_time = BGW_NEVER_RESTART;
 	worker.bgw_main = ParallelWorkerMain;
diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index e42ef98..6ad8fd0 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -718,9 +718,11 @@ create_plain_partial_paths(PlannerInfo *root, RelOptInfo *rel)
 	}
 
 	/*
-	 * In no case use more than max_parallel_workers_per_gather workers.
+	 * In no case use more than max_parallel_workers or
+	 * max_parallel_workers_per_gather workers.
 	 */
-	parallel_workers = Min(parallel_workers, max_parallel_workers_per_gather);
+	parallel_workers = Min(max_parallel_workers, Min(parallel_workers,
+				max_parallel_workers_per_gather));
 
 	/* If any limit was set to zero, the user doesn't want a parallel scan. */
 	if (parallel_workers <= 0)
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index 2a49639..09dc077 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -113,6 +113,7 @@ int			effective_cache_size = DEFAULT_EFFECTIVE_CACHE_SIZE;
 
 Cost		disable_cost = 1.0e10;
 
+int			max_parallel_workers = 8;
 int			max_parallel_workers_per_gather = 2;
 
 bool		enable_seqscan = true;
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index f657ffc..aa8670b 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -249,6 +249,7 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
 		parse->utilityStmt == NULL &&
 		!parse->hasModifyingCTE &&
 		max_parallel_workers_per_gather > 0 &&
+		max_parallel_workers > 0 &&
 		!IsParallelWorker() &&
 		!IsolationIsSerializable())
 	{
diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index 699c934..665c2f4 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -16,6 +16,7 @@
 
 #include "miscadmin.h"
 #include "libpq/pqsignal.h"
+#include "optimizer/cost.h"
 #include "postmaster/bgworker_internals.h"
 #include "postmaster/postmaster.h"
 #include "storage/barrier.h"
@@ -76,12 +77,26 @@ typedef struct BackgroundWorkerSlot
 	bool		terminate;
 	pid_t		pid;			/* InvalidPid = not started yet; 0 = dead */
 	uint64		generation;		/* incremented when slot is recycled */
+	int			worker_class;
 	BackgroundWorker worker;
 } BackgroundWorkerSlot;
 
+/*
+ * In order to limit the total number of parallel workers (according to
+ * max_parallel_workers GUC), we maintain the number of active parallel
+ * workers.  Since the postmaster cannot take locks, two variables are used for
+ * this purpose: the number of registered parallel workers (modified by the
+ * backends, protected by BackgroundWorkerLock) and the number of terminated
+ * parallel workers (modified only by the postmaster, lockless).  The active
+ * number of parallel workers is the number of registered workers minus the
+ * terminated ones.  These counters can of course overflow, but it's not
+ * important here since the subtraction will still give the right number.
+ */
 typedef struct BackgroundWorkerArray
 {
 	int			total_slots;
+	uint32		parallel_register_count;
+	uint32		parallel_terminate_count;
 	BackgroundWorkerSlot slot[FLEXIBLE_ARRAY_MEMBER];
 } BackgroundWorkerArray;
 
@@ -126,6 +141,8 @@ BackgroundWorkerShmemInit(void)
 		int			slotno = 0;
 
 		BackgroundWorkerData->total_slots = max_worker_processes;
+		BackgroundWorkerData->parallel_register_count = 0;
+		BackgroundWorkerData->parallel_terminate_count = 0;
 
 		/*
 		 * Copy contents of worker list into shared memory.  Record the shared
@@ -144,6 +161,7 @@ BackgroundWorkerShmemInit(void)
 			slot->terminate = false;
 			slot->pid = InvalidPid;
 			slot->generation = 0;
+			slot->worker_class = 0;
 			rw->rw_shmem_slot = slotno;
 			rw->rw_worker.bgw_notify_pid = 0;	/* might be reinit after crash */
 			memcpy(&slot->worker, &rw->rw_worker, sizeof(BackgroundWorker));
@@ -266,9 +284,12 @@ BackgroundWorkerStateChange(void)
 
 			/*
 			 * We need a memory barrier here to make sure that the load of
-			 * bgw_notify_pid completes before the store to in_use.
+			 * bgw_notify_pid and the update of parallel_terminate_count
+			 * complete before the store to in_use.
 			 */
 			notify_pid = slot->worker.bgw_notify_pid;
+			if (slot->worker_class & BGWORKER_CLASS_PARALLEL)
+				BackgroundWorkerData->parallel_terminate_count++;
 			pg_memory_barrier();
 			slot->pid = 0;
 			slot->in_use = false;
@@ -369,6 +390,16 @@ ForgetBackgroundWorker(slist_mutable_iter *cur)
 
 	Assert(rw->rw_shmem_slot < max_worker_processes);
 	slot = &BackgroundWorkerData->slot[rw->rw_shmem_slot];
+	if (slot->worker_class & BGWORKER_CLASS_PARALLEL)
+		BackgroundWorkerData->parallel_terminate_count++;
+
+	/*
+	 * We need a write barrier to make sure the update of
+	 * parallel_terminate_count is done before the store to in_use. Otherwise,
+	 * it's possible that concurrent registration of worker can rewrite the
+	 * worker_class before we check it here.
+	 */
+	pg_write_barrier();
 	slot->in_use = false;
 
 	ereport(DEBUG1,
@@ -498,6 +529,28 @@ SanityCheckBackgroundWorker(BackgroundWorker *worker, int elevel)
 		/* XXX other checks? */
 	}
 
+	/* sanity checks for parallel workers */
+	if (worker->bgw_flags & BGWORKER_CLASS_PARALLEL)
+	{
+		if (max_parallel_workers == 0)
+		{
+			ereport(elevel,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("background worker \"%s\": cannot request parallel worker if no parallel worker is allowed",
+							worker->bgw_name)));
+			return false;
+		}
+		if ((worker->bgw_flags & BGWORKER_SHMEM_ACCESS) == 0 ||
+			(worker->bgw_flags & BGWORKER_BACKEND_DATABASE_CONNECTION) == 0)
+		{
+			ereport(elevel,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("background worker \"%s\": cannot request parallel worker without shared memory access and database connection",
+							worker->bgw_name)));
+			return false;
+		}
+	}
+
 	if ((worker->bgw_restart_time < 0 &&
 		 worker->bgw_restart_time != BGW_NEVER_RESTART) ||
 		(worker->bgw_restart_time > USECS_PER_DAY / 1000))
@@ -824,6 +877,7 @@ RegisterDynamicBackgroundWorker(BackgroundWorker *worker,
 {
 	int			slotno;
 	bool		success = false;
+	bool		parallel;
 	uint64		generation = 0;
 
 	/*
@@ -840,8 +894,24 @@ RegisterDynamicBackgroundWorker(BackgroundWorker *worker,
 	if (!SanityCheckBackgroundWorker(worker, ERROR))
 		return false;
 
+	parallel = (worker->bgw_flags & BGWORKER_CLASS_PARALLEL);
+
 	LWLockAcquire(BackgroundWorkerLock, LW_EXCLUSIVE);
 
+	if (parallel && (BackgroundWorkerData->parallel_register_count -
+					 BackgroundWorkerData->parallel_terminate_count) >=
+		max_parallel_workers)
+	{
+		LWLockRelease(BackgroundWorkerLock);
+		return false;
+	}
+
+	/*
+	 * We need a read barrier here to make sure the above test doesn't get
+	 * reordered
+	 */
+	pg_read_barrier();
+
 	/*
 	 * Look for an unused slot.  If we find one, grab it.
 	 */
@@ -855,7 +925,10 @@ RegisterDynamicBackgroundWorker(BackgroundWorker *worker,
 			slot->pid = InvalidPid;		/* indicates not started yet */
 			slot->generation++;
 			slot->terminate = false;
+			slot->worker_class = (worker->bgw_flags & BGWORKER_CLASS_MASK);
 			generation = slot->generation;
+			if (parallel)
+				BackgroundWorkerData->parallel_register_count++;
 
 			/*
 			 * Make sure postmaster doesn't see the slot as in use before it
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index f232083..8277d84 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -121,7 +121,7 @@ int			replacement_sort_tuples = 150000;
  */
 int			NBuffers = 1000;
 int			MaxConnections = 90;
-int			max_worker_processes = 8;
+int			max_worker_processes = 16;
 int			MaxBackends = 0;
 
 int			VacuumCostPageHit = 1;		/* GUC parameters for vacuum */
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 622279b..3c12b3d 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2479,7 +2479,7 @@ static struct config_int ConfigureNamesInt[] =
 			NULL,
 		},
 		&max_worker_processes,
-		8, 0, MAX_BACKENDS,
+		16, 0, MAX_BACKENDS,
 		check_max_worker_processes, NULL, NULL
 	},
 
@@ -2667,6 +2667,16 @@ static struct config_int ConfigureNamesInt[] =
 	},
 
 	{
+		{"max_parallel_workers", PGC_USERSET, RESOURCES_ASYNCHRONOUS,
+			gettext_noop("Sets the maximum number of parallel workers than can be active at one time."),
+			NULL
+		},
+		&max_parallel_workers,
+		8, 0, 1024,
+		NULL, NULL, NULL
+	},
+
+	{
 		{"autovacuum_work_mem", PGC_SIGHUP, RESOURCES_MEM,
 			gettext_noop("Sets the maximum memory to be used by each autovacuum worker process."),
 			NULL,
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 05b1373..3fb5a60 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -162,8 +162,9 @@
 # - Asynchronous Behavior -
 
 #effective_io_concurrency = 1		# 1-1000; 0 disables prefetching
-#max_worker_processes = 8		# (change requires restart)
+#max_worker_processes = 16		# (change requires restart)
 #max_parallel_workers_per_gather = 2	# taken from max_worker_processes
+#max_parallel_workers = 8	    # total maximum number of worker_processes
 #old_snapshot_threshold = -1		# 1min-60d; -1 disables; 0 is immediate
 									# (change requires restart)
 #backend_flush_after = 0		# 0 disables, default is 0
diff --git a/src/bin/pg_resetxlog/pg_resetxlog.c b/src/bin/pg_resetxlog/pg_resetxlog.c
index 525b82ba..5e2c55f 100644
--- a/src/bin/pg_resetxlog/pg_resetxlog.c
+++ b/src/bin/pg_resetxlog/pg_resetxlog.c
@@ -584,7 +584,7 @@ GuessControlValues(void)
 	ControlFile.wal_log_hints = false;
 	ControlFile.track_commit_timestamp = false;
 	ControlFile.MaxConnections = 100;
-	ControlFile.max_worker_processes = 8;
+	ControlFile.max_worker_processes = 16;
 	ControlFile.max_prepared_xacts = 0;
 	ControlFile.max_locks_per_xact = 64;
 
@@ -800,7 +800,7 @@ RewriteControlFile(void)
 	ControlFile.wal_log_hints = false;
 	ControlFile.track_commit_timestamp = false;
 	ControlFile.MaxConnections = 100;
-	ControlFile.max_worker_processes = 8;
+	ControlFile.max_worker_processes = 16;
 	ControlFile.max_prepared_xacts = 0;
 	ControlFile.max_locks_per_xact = 64;
 
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index 2a4df2f..190f33b 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -55,6 +55,7 @@ extern PGDLLIMPORT double parallel_setup_cost;
 extern PGDLLIMPORT int effective_cache_size;
 extern Cost disable_cost;
 extern int	max_parallel_workers_per_gather;
+extern int	max_parallel_workers;
 extern bool enable_seqscan;
 extern bool enable_indexscan;
 extern bool enable_indexonlyscan;
diff --git a/src/include/postmaster/bgworker.h b/src/include/postmaster/bgworker.h
index b6889a3..5272c91 100644
--- a/src/include/postmaster/bgworker.h
+++ b/src/include/postmaster/bgworker.h
@@ -58,6 +58,18 @@
  */
 #define BGWORKER_BACKEND_DATABASE_CONNECTION		0x0002
 
+/* bitmask to define background worker classes */
+#define BGWORKER_CLASS_MASK   0x00f0
+
+/*
+ * This class is used internally for parallel queries, to keep track of the
+ * number of active parallel workers and make sure we never launch more than
+ * max_parallel_workers parallel workers at the same time.  Third party
+ * background workers should not use this class.
+ */
+#define BGWORKER_CLASS_PARALLEL					0x0010
+/* add additional bgworker classes here */
+
 
 typedef void (*bgworker_main_type) (Datum main_arg);
 
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to