On 16/09/2016 20:24, Robert Haas wrote:
> On Wed, Jun 29, 2016 at 10:46 PM, Amit Kapila <amit.kapil...@gmail.com> wrote:
>> Your patch looks good to me and is ready for a committer's look.
>>
>> Notes for committer -
>> a. Verify if description of newly added Guc max_parallel_workers looks
>> okay to you, me and Julien are not in 100% agreement on that.
>> b. Comments might need some improvement.
> 
> This patch needs to be rebased.  I hope somebody can volunteer to do
> that, because I'd like to commit it once we've hashed out the details.
> 

I just rebased the previous patch on current HEAD, with some other
modifications, see below (attached v8 if that helps).

> Would it bother anybody very much if we bumped up these values, say by
> increasing max_worker_processes from 8 to 16 and max_parallel_workers
> from 4 (as it is in the current patch version) to 8?  I feel like 4 is
> a bit more conservative than I'd like to be by default, and I'd like
> to make sure that we leave room for other sorts of background workers
> between the two limits.
> 

That's fine by me.  Should this be done (if there's no objection) in the
same patch, or in another one?


> I'd suggest renaming the "parallel" flag to BackgroundWorkerSlot to
> "is_parallel_worker".  Or, actually, what I think would be better is
> to give it a name like worker_class, and then we can have
> BGWORKER_CLASS_PARALLEL and perhaps eventually
> BGWORKER_CLASS_REPLICATION, etc.
> 

For now I just renamed "parallel" to "is_parallel_worker", since this is
straightforward.  For a new "worker_class", I guess we'd need a new enum
stored in BackgroundWorker struct instead of the
BGWORKER_IS_PARALLEL_WORKER flag, and store it in the
BackgroundWorkerSlot. Should I do that instead?


> + * terminated ones.  These counters can of course overlaps, but it's not
> + * important here since the substraction will still give the right number.
> 
> overlaps -> overflow.  substraction -> subtraction.
> 

oops sorry, fixed

> +       /*
> +        * We need a write barrier to make sure the update of
> +        * parallel_terminate_count is done before the store to in_use
> +        */
> 
> Does the order actually matter here?
> 

After some more thinking, it looks like a reorder here won't have any
impact. I'll remove it, unless Amit has an objection about it.

> +               {"max_parallel_workers", PGC_USERSET, RESOURCES_ASYNCHRONOUS,
> +                       gettext_noop("Sets the maximum number of
> parallel processes for the cluster."),
> 
> I suggest: sets the maximum number of parallel workers that can be
> active at one time.
> 

changed

-- 
Julien Rouhaud
http://dalibo.com - http://dalibo.org
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index cd66abc..3abd2e5 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2005,8 +2005,9 @@ include_dir 'conf.d'
          Sets the maximum number of workers that can be started by a single
          <literal>Gather</literal> node.  Parallel workers are taken from the
          pool of processes established by
-         <xref linkend="guc-max-worker-processes">.  Note that the requested
-         number of workers may not actually be available at run time.  If this
+         <xref linkend="guc-max-worker-processes">, limited by
+         <xref linkend="guc-max-parallel-workers">.  Note that the requested
+         number of workers may not actually be available at runtime.  If this
          occurs, the plan will run with fewer workers than expected, which may
          be inefficient.  The default value is 2.  Setting this value to 0
          disables parallel query execution.
@@ -2030,6 +2031,21 @@ include_dir 'conf.d'
        </listitem>
       </varlistentry>
 
+      <varlistentry id="guc-max-parallel-workers" 
xreflabel="max_parallel_workers">
+       <term><varname>max_parallel_workers</varname> (<type>integer</type>)
+       <indexterm>
+        <primary><varname>max_parallel_workers</> configuration 
parameter</primary>
+       </indexterm>
+       </term>
+       <listitem>
+        <para>
+         Sets the maximum number of workers that the system can support for
+         parallel queries.  The default value is 4.  Setting this value to 0
+         disables parallel query execution.
+        </para>
+       </listitem>
+      </varlistentry>
+
       <varlistentry id="guc-backend-flush-after" 
xreflabel="backend_flush_after">
        <term><varname>backend_flush_after</varname> (<type>integer</type>)
        <indexterm>
diff --git a/src/backend/access/transam/parallel.c 
b/src/backend/access/transam/parallel.c
index cde0ed3..5f6e429 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -453,7 +453,8 @@ LaunchParallelWorkers(ParallelContext *pcxt)
        snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
                         MyProcPid);
        worker.bgw_flags =
-               BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
+               BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION
+               | BGWORKER_IS_PARALLEL_WORKER;
        worker.bgw_start_time = BgWorkerStart_ConsistentState;
        worker.bgw_restart_time = BGW_NEVER_RESTART;
        worker.bgw_main = ParallelWorkerMain;
diff --git a/src/backend/optimizer/path/allpaths.c 
b/src/backend/optimizer/path/allpaths.c
index 99b6bc8..8f33813 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -718,9 +718,11 @@ create_plain_partial_paths(PlannerInfo *root, RelOptInfo 
*rel)
        }
 
        /*
-        * In no case use more than max_parallel_workers_per_gather workers.
+        * In no case use more than max_parallel_workers or
+        * max_parallel_workers_per_gather workers.
         */
-       parallel_workers = Min(parallel_workers, 
max_parallel_workers_per_gather);
+       parallel_workers = Min(max_parallel_workers, Min(parallel_workers,
+                               max_parallel_workers_per_gather));
 
        /* If any limit was set to zero, the user doesn't want a parallel scan. 
*/
        if (parallel_workers <= 0)
diff --git a/src/backend/optimizer/path/costsize.c 
b/src/backend/optimizer/path/costsize.c
index 2a49639..580e7c5 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -113,6 +113,7 @@ int                 effective_cache_size = 
DEFAULT_EFFECTIVE_CACHE_SIZE;
 
 Cost           disable_cost = 1.0e10;
 
+int                    max_parallel_workers = 4;
 int                    max_parallel_workers_per_gather = 2;
 
 bool           enable_seqscan = true;
diff --git a/src/backend/optimizer/plan/planner.c 
b/src/backend/optimizer/plan/planner.c
index f657ffc..aa8670b 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -249,6 +249,7 @@ standard_planner(Query *parse, int cursorOptions, 
ParamListInfo boundParams)
                parse->utilityStmt == NULL &&
                !parse->hasModifyingCTE &&
                max_parallel_workers_per_gather > 0 &&
+               max_parallel_workers > 0 &&
                !IsParallelWorker() &&
                !IsolationIsSerializable())
        {
diff --git a/src/backend/postmaster/bgworker.c 
b/src/backend/postmaster/bgworker.c
index 699c934..3af110a 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -16,6 +16,7 @@
 
 #include "miscadmin.h"
 #include "libpq/pqsignal.h"
+#include "optimizer/cost.h"
 #include "postmaster/bgworker_internals.h"
 #include "postmaster/postmaster.h"
 #include "storage/barrier.h"
@@ -76,12 +77,26 @@ typedef struct BackgroundWorkerSlot
        bool            terminate;
        pid_t           pid;                    /* InvalidPid = not started 
yet; 0 = dead */
        uint64          generation;             /* incremented when slot is 
recycled */
+       bool            is_parallel_worker;
        BackgroundWorker worker;
 } BackgroundWorkerSlot;
 
+/*
+ * In order to limit the total number of parallel workers (according to
+ * max_parallel_workers GUC), we maintain the number of active parallel
+ * workers.  Since the postmaster cannot take locks, two variables are used for
+ * this purpose: the number of registered parallel workers (modified by the
+ * backends, protected by BackgroundWorkerLock) and the number of terminated
+ * parallel workers (modified only by the postmaster, lockless).  The active
+ * number of parallel workers is the number of registered workers minus the
+ * terminated ones.  These counters can of course overflow, but it's not
+ * important here since the subtraction will still give the right number.
+ */
 typedef struct BackgroundWorkerArray
 {
        int                     total_slots;
+       uint32          parallel_register_count;
+       uint32          parallel_terminate_count;
        BackgroundWorkerSlot slot[FLEXIBLE_ARRAY_MEMBER];
 } BackgroundWorkerArray;
 
@@ -126,6 +141,8 @@ BackgroundWorkerShmemInit(void)
                int                     slotno = 0;
 
                BackgroundWorkerData->total_slots = max_worker_processes;
+               BackgroundWorkerData->parallel_register_count = 0;
+               BackgroundWorkerData->parallel_terminate_count = 0;
 
                /*
                 * Copy contents of worker list into shared memory.  Record the 
shared
@@ -144,6 +161,7 @@ BackgroundWorkerShmemInit(void)
                        slot->terminate = false;
                        slot->pid = InvalidPid;
                        slot->generation = 0;
+                       slot->is_parallel_worker = false;
                        rw->rw_shmem_slot = slotno;
                        rw->rw_worker.bgw_notify_pid = 0;       /* might be 
reinit after crash */
                        memcpy(&slot->worker, &rw->rw_worker, 
sizeof(BackgroundWorker));
@@ -266,9 +284,12 @@ BackgroundWorkerStateChange(void)
 
                        /*
                         * We need a memory barrier here to make sure that the 
load of
-                        * bgw_notify_pid completes before the store to in_use.
+                        * bgw_notify_pid and the update of 
parallel_terminate_count
+                        * complete before the store to in_use.
                         */
                        notify_pid = slot->worker.bgw_notify_pid;
+                       if (slot->is_parallel_worker)
+                               
BackgroundWorkerData->parallel_terminate_count++;
                        pg_memory_barrier();
                        slot->pid = 0;
                        slot->in_use = false;
@@ -369,6 +390,14 @@ ForgetBackgroundWorker(slist_mutable_iter *cur)
 
        Assert(rw->rw_shmem_slot < max_worker_processes);
        slot = &BackgroundWorkerData->slot[rw->rw_shmem_slot];
+       if (slot->is_parallel_worker)
+               BackgroundWorkerData->parallel_terminate_count++;
+
+       /*
+        * We need a write barrier to make sure the update of
+        * parallel_terminate_count is done before the store to in_use
+        */
+       pg_write_barrier();
        slot->in_use = false;
 
        ereport(DEBUG1,
@@ -498,6 +527,28 @@ SanityCheckBackgroundWorker(BackgroundWorker *worker, int 
elevel)
                /* XXX other checks? */
        }
 
+       /* sanity checks for parallel workers */
+       if ((worker->bgw_flags & BGWORKER_IS_PARALLEL_WORKER) != 0)
+       {
+               if (max_parallel_workers == 0)
+               {
+                       ereport(elevel,
+                                       
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                        errmsg("background worker \"%s\": 
cannot request parallel worker if no parallel worker is allowed",
+                                                       worker->bgw_name)));
+                       return false;
+               }
+               if ((worker->bgw_flags & BGWORKER_SHMEM_ACCESS) == 0 ||
+                       (worker->bgw_flags & 
BGWORKER_BACKEND_DATABASE_CONNECTION) == 0)
+               {
+                       ereport(elevel,
+                                       
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                        errmsg("background worker \"%s\": 
cannot request parallel worker without shared memory access and database 
connection",
+                                                       worker->bgw_name)));
+                       return false;
+               }
+       }
+
        if ((worker->bgw_restart_time < 0 &&
                 worker->bgw_restart_time != BGW_NEVER_RESTART) ||
                (worker->bgw_restart_time > USECS_PER_DAY / 1000))
@@ -824,6 +875,7 @@ RegisterDynamicBackgroundWorker(BackgroundWorker *worker,
 {
        int                     slotno;
        bool            success = false;
+       bool            parallel;
        uint64          generation = 0;
 
        /*
@@ -840,8 +892,24 @@ RegisterDynamicBackgroundWorker(BackgroundWorker *worker,
        if (!SanityCheckBackgroundWorker(worker, ERROR))
                return false;
 
+       parallel = ((worker->bgw_flags & BGWORKER_IS_PARALLEL_WORKER) != 0);
+
        LWLockAcquire(BackgroundWorkerLock, LW_EXCLUSIVE);
 
+       if (parallel && (BackgroundWorkerData->parallel_register_count -
+                                        
BackgroundWorkerData->parallel_terminate_count) >=
+               max_parallel_workers)
+       {
+               LWLockRelease(BackgroundWorkerLock);
+               return false;
+       }
+
+       /*
+        * We need a read barrier here to make sure the above test doesn't get
+        * reordered
+        */
+       pg_read_barrier();
+
        /*
         * Look for an unused slot.  If we find one, grab it.
         */
@@ -855,7 +923,10 @@ RegisterDynamicBackgroundWorker(BackgroundWorker *worker,
                        slot->pid = InvalidPid;         /* indicates not 
started yet */
                        slot->generation++;
                        slot->terminate = false;
+                       slot->is_parallel_worker = parallel;
                        generation = slot->generation;
+                       if (parallel)
+                               BackgroundWorkerData->parallel_register_count++;
 
                        /*
                         * Make sure postmaster doesn't see the slot as in use 
before it
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index ce4eef9..5a75970 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2661,6 +2661,16 @@ static struct config_int ConfigureNamesInt[] =
        },
 
        {
+               {"max_parallel_workers", PGC_USERSET, RESOURCES_ASYNCHRONOUS,
+                       gettext_noop("Sets the maximum number of parallel 
workers than can be active at one time."),
+                       NULL
+               },
+               &max_parallel_workers,
+               4, 0, 1024,
+               NULL, NULL, NULL
+       },
+
+       {
                {"autovacuum_work_mem", PGC_SIGHUP, RESOURCES_MEM,
                        gettext_noop("Sets the maximum memory to be used by 
each autovacuum worker process."),
                        NULL,
diff --git a/src/backend/utils/misc/postgresql.conf.sample 
b/src/backend/utils/misc/postgresql.conf.sample
index b1c3aea..af05803 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -164,6 +164,7 @@
 #effective_io_concurrency = 1          # 1-1000; 0 disables prefetching
 #max_worker_processes = 8              # (change requires restart)
 #max_parallel_workers_per_gather = 2   # taken from max_worker_processes
+#max_parallel_workers = 4          # total maximum number of worker_processes
 #old_snapshot_threshold = -1           # 1min-60d; -1 disables; 0 is immediate
                                                                        # 
(change requires restart)
 #backend_flush_after = 0               # 0 disables, default is 0
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index 2a4df2f..190f33b 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -55,6 +55,7 @@ extern PGDLLIMPORT double parallel_setup_cost;
 extern PGDLLIMPORT int effective_cache_size;
 extern Cost disable_cost;
 extern int     max_parallel_workers_per_gather;
+extern int     max_parallel_workers;
 extern bool enable_seqscan;
 extern bool enable_indexscan;
 extern bool enable_indexonlyscan;
diff --git a/src/include/postmaster/bgworker.h 
b/src/include/postmaster/bgworker.h
index b6889a3..4236e37 100644
--- a/src/include/postmaster/bgworker.h
+++ b/src/include/postmaster/bgworker.h
@@ -58,6 +58,14 @@
  */
 #define BGWORKER_BACKEND_DATABASE_CONNECTION           0x0002
 
+/*
+ * This flag is used internally for parallel queries, to keep track of the
+ * number of active parallel workers and make sure we never launch more than
+ * max_parallel_workers parallel workers at the same time.  Third party
+ * background workers should not use this flag.
+ */
+#define BGWORKER_IS_PARALLEL_WORKER                                    0x0004
+
 
 typedef void (*bgworker_main_type) (Datum main_arg);
 
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to