On 29/06/2016 06:29, Amit Kapila wrote: > On Wed, Jun 29, 2016 at 2:57 AM, Julien Rouhaud > <julien.rouh...@dalibo.com> wrote: >> >> Thanks a lot for the help! >> >> PFA v6 which should fix all the issues mentioned. > > Couple of minor suggestions. > > - <xref linkend="guc-max-worker-processes">. Note that the requested > + <xref linkend="guc-max-worker-processes">, limited by > + <xref linked="guc-max-parallel-workers">. Note that the requested > > Typo. > /linked/linkend >
Oops, fixed. > You can always find such mistakes by doing make check in doc/src/sgml/ > I wasn't aware of that, it's really a nice thing to know, thanks! > + /* > + * We need a memory barrier here to make sure the above test doesn't get > + * reordered > + */ > + pg_read_barrier(); > > /memory barrier/read barrier > fixed > + if (max_parallel_workers == 0) > + { > + ereport(elevel, > + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), > + errmsg("background worker \"%s\": cannot request parallel worker if > no parallel worker allowed", > > " ..no parallel worker is allowed". 'is' seems to be missing. > fixed > >> Also, after second >> thought I didn't add the extra hint about max_worker_processes in the >> max_parallel_worker paragraph, since this line was a duplicate of the >> precedent paragraph, it seemed better to leave the text as is. >> > > not a big problem, we can leave it for committer to decide on same. > However just by reading the description of max_parallel_worker, user > can set its value more than max_wroker_processes which we don't want. > Right. On the other hand I'm not sure that's really an issue, because such a case is handled in the code, and setting max_parallel_workers way above max_worker_processes could be a way to configure it as unlimited. Or should we allow setting it to -1 for instance to disable the limit? -- Julien Rouhaud http://dalibo.com - http://dalibo.org
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 061697b..3a47421 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -2005,7 +2005,8 @@ include_dir 'conf.d' Sets the maximum number of workers that can be started by a single <literal>Gather</literal> node. Parallel workers are taken from the pool of processes established by - <xref linkend="guc-max-worker-processes">. Note that the requested + <xref linkend="guc-max-worker-processes">, limited by + <xref linkend="guc-max-parallel-workers">. Note that the requested number of workers may not actually be available at runtime. If this occurs, the plan will run with fewer workers than expected, which may be inefficient. The default value is 2. Setting this value to 0 @@ -2014,6 +2015,21 @@ include_dir 'conf.d' </listitem> </varlistentry> + <varlistentry id="guc-max-parallel-workers" xreflabel="max_parallel_workers"> + <term><varname>max_parallel_workers</varname> (<type>integer</type>) + <indexterm> + <primary><varname>max_parallel_workers</> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Sets the maximum number of workers that the system can support for + parallel queries. The default value is 4. Setting this value to 0 + disables parallel query execution. + </para> + </listitem> + </varlistentry> + <varlistentry id="guc-backend-flush-after" xreflabel="backend_flush_after"> <term><varname>backend_flush_after</varname> (<type>integer</type>) <indexterm> diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 088700e..ea7680b 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -452,7 +452,8 @@ LaunchParallelWorkers(ParallelContext *pcxt) snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d", MyProcPid); worker.bgw_flags = - BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION + | BGWORKER_IS_PARALLEL_WORKER; worker.bgw_start_time = BgWorkerStart_ConsistentState; worker.bgw_restart_time = BGW_NEVER_RESTART; worker.bgw_main = ParallelWorkerMain; diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index 2e4b670..e1da5f9 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -724,9 +724,11 @@ create_plain_partial_paths(PlannerInfo *root, RelOptInfo *rel) } /* - * In no case use more than max_parallel_workers_per_gather workers. + * In no case use more than max_parallel_workers or + * max_parallel_workers_per_gather workers. */ - parallel_workers = Min(parallel_workers, max_parallel_workers_per_gather); + parallel_workers = Min(max_parallel_workers, Min(parallel_workers, + max_parallel_workers_per_gather)); /* If any limit was set to zero, the user doesn't want a parallel scan. */ if (parallel_workers <= 0) diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index 8c1dccc..6cb2f4e 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -113,6 +113,7 @@ int effective_cache_size = DEFAULT_EFFECTIVE_CACHE_SIZE; Cost disable_cost = 1.0e10; +int max_parallel_workers = 4; int max_parallel_workers_per_gather = 2; bool enable_seqscan = true; diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 070ad31..2c5c3d6 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -249,8 +249,9 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) IsUnderPostmaster && dynamic_shared_memory_type != DSM_IMPL_NONE && parse->commandType == CMD_SELECT && !parse->hasModifyingCTE && parse->utilityStmt == NULL && max_parallel_workers_per_gather > 0 && - !IsParallelWorker() && !IsolationIsSerializable() && - !has_parallel_hazard((Node *) parse, true); + max_parallel_workers > 0 && !IsParallelWorker() && + !IsolationIsSerializable() && !has_parallel_hazard((Node *) parse, + true); /* * glob->parallelModeNeeded should tell us whether it's necessary to diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 382edad..5df5c8c 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -16,6 +16,7 @@ #include "miscadmin.h" #include "libpq/pqsignal.h" +#include "optimizer/cost.h" #include "postmaster/bgworker_internals.h" #include "postmaster/postmaster.h" #include "storage/barrier.h" @@ -76,12 +77,26 @@ typedef struct BackgroundWorkerSlot bool terminate; pid_t pid; /* InvalidPid = not started yet; 0 = dead */ uint64 generation; /* incremented when slot is recycled */ + bool parallel; BackgroundWorker worker; } BackgroundWorkerSlot; +/* + * In order to limit the total number of parallel workers (according to + * max_parallel_workers GUC), we maintain the number of active parallel + * workers. Since the postmaster cannot take locks, two variables are used for + * this purpose: the number of registered parallel workers (modified by the + * backends, protected by BackgroundWorkerLock) and the number of terminated + * parallel workers (modified only by the postmaster, lockless). The active + * number of parallel workers is the number of registered workers minus the + * terminated ones. These counters can of course overlaps, but it's not + * important here since the substraction will still give the right number. + */ typedef struct BackgroundWorkerArray { int total_slots; + uint32 parallel_register_count; + uint32 parallel_terminate_count; BackgroundWorkerSlot slot[FLEXIBLE_ARRAY_MEMBER]; } BackgroundWorkerArray; @@ -126,6 +141,8 @@ BackgroundWorkerShmemInit(void) int slotno = 0; BackgroundWorkerData->total_slots = max_worker_processes; + BackgroundWorkerData->parallel_register_count = 0; + BackgroundWorkerData->parallel_terminate_count = 0; /* * Copy contents of worker list into shared memory. Record the shared @@ -144,6 +161,7 @@ BackgroundWorkerShmemInit(void) slot->terminate = false; slot->pid = InvalidPid; slot->generation = 0; + slot->parallel = false; rw->rw_shmem_slot = slotno; rw->rw_worker.bgw_notify_pid = 0; /* might be reinit after crash */ memcpy(&slot->worker, &rw->rw_worker, sizeof(BackgroundWorker)); @@ -266,9 +284,12 @@ BackgroundWorkerStateChange(void) /* * We need a memory barrier here to make sure that the load of - * bgw_notify_pid completes before the store to in_use. + * bgw_notify_pid and the update of parallel_terminate_count + * complete before the store to in_use. */ notify_pid = slot->worker.bgw_notify_pid; + if (slot->parallel) + BackgroundWorkerData->parallel_terminate_count++; pg_memory_barrier(); slot->pid = 0; slot->in_use = false; @@ -369,6 +390,14 @@ ForgetBackgroundWorker(slist_mutable_iter *cur) Assert(rw->rw_shmem_slot < max_worker_processes); slot = &BackgroundWorkerData->slot[rw->rw_shmem_slot]; + if (slot->parallel) + BackgroundWorkerData->parallel_terminate_count++; + + /* + * We need a write barrier to make sure the update of + * parallel_terminate_count is done before the store to in_use + */ + pg_write_barrier(); slot->in_use = false; ereport(DEBUG1, @@ -498,6 +527,28 @@ SanityCheckBackgroundWorker(BackgroundWorker *worker, int elevel) /* XXX other checks? */ } + /* sanity checks for parallel workers */ + if ((worker->bgw_flags & BGWORKER_IS_PARALLEL_WORKER) != 0) + { + if (max_parallel_workers == 0) + { + ereport(elevel, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("background worker \"%s\": cannot request parallel worker if no parallel worker is allowed", + worker->bgw_name))); + return false; + } + if ((worker->bgw_flags & BGWORKER_SHMEM_ACCESS) == 0 || + (worker->bgw_flags & BGWORKER_BACKEND_DATABASE_CONNECTION) == 0) + { + ereport(elevel, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("background worker \"%s\": cannot request parallel worker without shared memory access and database connection", + worker->bgw_name))); + return false; + } + } + if ((worker->bgw_restart_time < 0 && worker->bgw_restart_time != BGW_NEVER_RESTART) || (worker->bgw_restart_time > USECS_PER_DAY / 1000)) @@ -824,6 +875,7 @@ RegisterDynamicBackgroundWorker(BackgroundWorker *worker, { int slotno; bool success = false; + bool parallel; uint64 generation = 0; /* @@ -840,8 +892,24 @@ RegisterDynamicBackgroundWorker(BackgroundWorker *worker, if (!SanityCheckBackgroundWorker(worker, ERROR)) return false; + parallel = ((worker->bgw_flags & BGWORKER_IS_PARALLEL_WORKER) != 0); + LWLockAcquire(BackgroundWorkerLock, LW_EXCLUSIVE); + if (parallel && (BackgroundWorkerData->parallel_register_count - + BackgroundWorkerData->parallel_terminate_count) >= + max_parallel_workers) + { + LWLockRelease(BackgroundWorkerLock); + return false; + } + + /* + * We need a read barrier here to make sure the above test doesn't get + * reordered + */ + pg_read_barrier(); + /* * Look for an unused slot. If we find one, grab it. */ @@ -855,7 +923,10 @@ RegisterDynamicBackgroundWorker(BackgroundWorker *worker, slot->pid = InvalidPid; /* indicates not started yet */ slot->generation++; slot->terminate = false; + slot->parallel = parallel; generation = slot->generation; + if (parallel) + BackgroundWorkerData->parallel_register_count++; /* * Make sure postmaster doesn't see the slot as in use before it diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 60148b8..e051000 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -2657,6 +2657,16 @@ static struct config_int ConfigureNamesInt[] = }, { + {"max_parallel_workers", PGC_USERSET, RESOURCES_ASYNCHRONOUS, + gettext_noop("Sets the maximum number of parallel processes for the cluster."), + NULL + }, + &max_parallel_workers, + 4, 0, 1024, + NULL, NULL, NULL + }, + + { {"autovacuum_work_mem", PGC_SIGHUP, RESOURCES_MEM, gettext_noop("Sets the maximum memory to be used by each autovacuum worker process."), NULL, diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 3fa0540..3ff996f 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -168,6 +168,7 @@ #effective_io_concurrency = 1 # 1-1000; 0 disables prefetching #max_worker_processes = 8 # (change requires restart) #max_parallel_workers_per_gather = 2 # taken from max_worker_processes +#max_parallel_workers = 4 # total maximum number of worker_processes #old_snapshot_threshold = -1 # 1min-60d; -1 disables; 0 is immediate # (change requires restart) #backend_flush_after = 0 # 0 disables, default is 0 diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index 2a4df2f..190f33b 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -55,6 +55,7 @@ extern PGDLLIMPORT double parallel_setup_cost; extern PGDLLIMPORT int effective_cache_size; extern Cost disable_cost; extern int max_parallel_workers_per_gather; +extern int max_parallel_workers; extern bool enable_seqscan; extern bool enable_indexscan; extern bool enable_indexonlyscan; diff --git a/src/include/postmaster/bgworker.h b/src/include/postmaster/bgworker.h index b6889a3..4236e37 100644 --- a/src/include/postmaster/bgworker.h +++ b/src/include/postmaster/bgworker.h @@ -58,6 +58,14 @@ */ #define BGWORKER_BACKEND_DATABASE_CONNECTION 0x0002 +/* + * This flag is used internally for parallel queries, to keep track of the + * number of active parallel workers and make sure we never launch more than + * max_parallel_workers parallel workers at the same time. Third party + * background workers should not use this flag. + */ +#define BGWORKER_IS_PARALLEL_WORKER 0x0004 + typedef void (*bgworker_main_type) (Datum main_arg);
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers