On Tue, Apr 7, 2026 at 6:14 AM Andres Freund <[email protected]> wrote:
> On 2026-04-07 03:02:52 +1200, Thomas Munro wrote:
> > Here's an updated patch. It's mostly just rebased over the recent
> > firehose, but with lots of comments and a few names (hopefully)
> > improved. There is one code change to highlight though:
> >
> > maybe_start_io_workers() knows when it's not allowed to create new
> > workers, an interesting case being FatalError before we have started
> > the new world.
>
> *worker, I assume?
Thanks for the review and testing!
I meant the new world when "we're already starting up again", as in
this pre-existing code from master:
/*
* Don't start new workers if we're in the shutdown phase of a crash
* restart. But we *do* need to start if we're already starting up again.
*/
if (FatalError && pmState >= PM_STOP_BACKENDS)
return;
> > The previous coding of DetermineSleepTime() didn't
> > know about that, so it could return 0 (don't sleep), and then the
> > postmaster could busy-wait for restart progress.
>
> In master or the prior version of your patch?
master
This code that checks AbortStartTime and overrides the sleep time.
But it wouldn't be entered if FatalError is true but StartWorkerNeeded
or HaveCrashedWorker also happens to be true. Maybe that's OK but I
found it odd.
https://github.com/postgres/postgres/blob/a006bc7b1699d952afcb6d786343e8bf0ecc61d6/src/backend/postmaster/postmaster.c#L1567
> > Maybe there were
> > other cases like that, but in general DetermineSleepTime() and
> > maybe_start_io_workers() really need to be 100% in agreement. So I
> > have moved that knowledge into a new function
> > maybe_start_io_workers_scheduled_at(). Both DetermineSleepTime() and
> > maybe_start_io_workers() call that so there is a single source of
> > truth.
> >
> > I think I got confused about that because it's not that obvious why
> > the existing code doesn't test FatalError.
> >
> > I thought of a slightly bigger refactoring that might deconfuse
> > DetermineSleepTime() a bit more. Probably material for the next
> > cycle, but basically the idea is to stop using a bunch of different
> > conditions and different units of time and convert the whole thing to
> > a simple find-the-lowest-time function. I kept that separate.
> >
> > I'll post a new version of the patch that was v3-0002 separately.
>
>
> > From 6c5d16a15add62c68bb7f9c7b6a1e3bde1f406d8 Mon Sep 17 00:00:00 2001
> > From: Thomas Munro <[email protected]>
> > Date: Sat, 22 Mar 2025 00:36:49 +1300
> > Subject: [PATCH v4 1/2] aio: Adjust I/O worker pool size automatically.
> >
> > The size of the I/O worker pool used to implement io_method=worker was
> > previously controlled by the io_workers setting, defaulting to 3. It
> > was hard to know how to tune it effectively. It is now replaced with:
> >
> > io_min_workers=1
> > io_max_workers=8 (up to 32)
> > io_worker_idle_timeout=60s
> > io_worker_launch_interval=100ms
>
> I'm a bit concerned about defaulting to io_min_workers=1. That means in an
> intermittent workload, there will be no IO concurrency for short running but
> IO intensive queries, while having the dispatch overhead to the worker. It
> can still be a win if the query is CPU intensive, but far from all are.
>
> I'd therefore argue that the minimum ought to be at least 2.
WFM.
> > diff --git a/src/backend/postmaster/postmaster.c
> > b/src/backend/postmaster/postmaster.c
> > index 6f13e8f40a0..c42564500c6 100644
> > --- a/src/backend/postmaster/postmaster.c
> > +++ b/src/backend/postmaster/postmaster.c
>
>
> > @@ -1555,14 +1558,13 @@ checkControlFile(void)
> > static int
> > DetermineSleepTime(void)
> > {
> > - TimestampTz next_wakeup = 0;
> > + TimestampTz next_wakeup;
> >
> > /*
> > - * Normal case: either there are no background workers at all, or
> > we're in
> > - * a shutdown sequence (during which we ignore bgworkers altogether).
> > + * If an ImmediateShutdown or a crash restart has set a SIGKILL
> > timeout,
> > + * ignore everything else and wait for that.
> > */
> > - if (Shutdown > NoShutdown ||
> > - (!StartWorkerNeeded && !HaveCrashedWorker))
> > + if (Shutdown >= ImmediateShutdown || FatalError)
> > {
> > if (AbortStartTime != 0)
> > {
> > @@ -1582,14 +1584,16 @@ DetermineSleepTime(void)
> >
> > return seconds * 1000;
> > }
> > - else
> > - return 60 * 1000;
> > }
> >
> > - if (StartWorkerNeeded)
> > + /* Time of next maybe_start_io_workers() call, or 0 for none. */
> > + next_wakeup = maybe_start_io_workers_scheduled_at();
> > +
> > + /* Ignore bgworkers during shutdown. */
> > + if (StartWorkerNeeded && Shutdown == NoShutdown)
> > return 0;
>
> Why is the maybe_start_io_workers_scheduled_at() thing before the return 0
> here?
Seems OK? I mean sure I would to make this whole function more
uniform in structure, see my second patch, but...
> > - if (HaveCrashedWorker)
> > + if (HaveCrashedWorker && Shutdown == NoShutdown)
> > {
> > dlist_mutable_iter iter;
> >
>
>
> > @@ -3797,6 +3811,15 @@ process_pm_pmsignal(void)
> > StartWorkerNeeded = true;
> > }
> >
> > + /* Process IO worker start requests. */
> > + if (CheckPostmasterSignal(PMSIGNAL_IO_WORKER_GROW))
> > + {
> > + /*
> > + * No local flag, as the state is exposed through
> > pgaio_worker_*()
> > + * functions. This signal is received on potentially
> > actionable level
> > + * changes, so that maybe_start_io_workers() will run.
> > + */
> > + }
> > /* Process background worker state changes. */
> > if (CheckPostmasterSignal(PMSIGNAL_BACKGROUND_WORKER_CHANGE))
> > {
>
> Absolute nitpick - the different blocks so far have been separated by an empty
> line.
Fixed.
> > + /* Only proceed if a "grow" request is pending from existing workers.
> > */
> > + if (!pgaio_worker_test_grow())
> > + return 0;
>
> So this accesses shared memory from postmaster. I think this amount of access
> is safe enough that that's ok. You'd have to somehow have corrupted
> postmaster's copy of io_worker_control, or unmapped the shared memory it is
> pointed to, for that to cause a crash. The first shouldn't be an issue, the
> latter would be quite the confusion fo the state machine.
Cool.
> > +/*
> > + * Start I/O workers if required. Used at startup, to respond to change of
> > + * the io_min_workers GUC, when asked to start a new one due to submission
> > + * queue backlog, and after workers terminate in response to errors (by
> > + * starting "replacement" workers).
> > + */
> > +static void
> > +maybe_start_io_workers(void)
> > +{
> > + TimestampTz scheduled_at;
> >
> > - /* Not enough running? */
> > - while (io_worker_count < io_workers)
> > + while ((scheduled_at = maybe_start_io_workers_scheduled_at()) != 0)
> > {
> > + TimestampTz now = GetCurrentTimestamp();
> > PMChild *child;
> > int i;
> >
> > + Assert(pmState < PM_WAIT_IO_WORKERS);
> > +
> > + /* Still waiting for the scheduled time? */
> > + if (scheduled_at > now)
> > + break;
> > +
> > + /* Clear the grow request flag if it is set. */
> > + pgaio_worker_clear_grow();
> > +
> > + /*
> > + * Compute next launch time relative to the previous value,
> > so that
> > + * time spent on the postmaster's other duties don't result
> > in an
> > + * inaccurate launch interval.
> > + */
> > + io_worker_launch_next_time =
> > +
> > TimestampTzPlusMilliseconds(io_worker_launch_next_time,
> > +
> > io_worker_launch_interval);
> > +
> > + /*
> > + * If that's already in the past, the interval is either
> > impossibly
> > + * short or we received no requests for new workers for a
> > period.
> > + * Compute a new future time relative to the last launch time
> > instead.
> > + */
> > + if (io_worker_launch_next_time <= now)
> > + io_worker_launch_next_time =
> > +
> > TimestampTzPlusMilliseconds(io_worker_launch_last_time,
> > +
> > io_worker_launch_interval);
>
> Did you intend to use TimestampTzPlusMilliseconds(now, ...) here? Or did you
> want to have this if after the next line:
>
> > + io_worker_launch_last_time = now;
> > +
>
> Because otherwise I don't understand how this is intended to work.
I can't remember why I did it like that. Changed.
> > /* find unused entry in io_worker_children array */
> > for (i = 0; i < MAX_IO_WORKERS; ++i)
> > {
> > @@ -4454,20 +4539,14 @@ maybe_adjust_io_workers(void)
> > ++io_worker_count;
> > }
> > else
> > - break; /* try again next
> > time */
> > - }
> > -
> > - /* Too many running? */
> > - if (io_worker_count > io_workers)
> > - {
> > - /* ask the IO worker in the highest slot to exit */
> > - for (int i = MAX_IO_WORKERS - 1; i >= 0; --i)
> > {
> > - if (io_worker_children[i] != NULL)
> > - {
> > - kill(io_worker_children[i]->pid, SIGUSR2);
> > - break;
> > - }
> > + /*
> > + * Fork failure: we'll try again after the launch
> > interval
> > + * expires, or be called again without delay if we
> > don't yet have
> > + * io_min_workers. Don't loop here though, the
> > postmaster has
> > + * other duties.
> > + */
> > + break;
> > }
> > }
> > }
>
> Reading just this part of the diff I am wondering what is reponsible for
> reducing the number of workers below the max after a config change. I assume
> it's done in the workers, but it might be worth putting a comment here noting
> that.
Done.
> > +/* Debugging support: show current IO and wakeups:ios statistics in ps. */
> > +/* #define PGAIO_WORKER_SHOW_PS_INFO */
> >
> > typedef struct PgAioWorkerSubmissionQueue
> > {
> > @@ -63,13 +67,34 @@ typedef struct PgAioWorkerSubmissionQueue
> >
> > typedef struct PgAioWorkerSlot
> > {
> > - Latch *latch;
> > - bool in_use;
> > + ProcNumber proc_number;
> > } PgAioWorkerSlot;
> >
> > +/*
> > + * Sets of worker IDs are held in a simple bitmap, accessed through
> > functions
> > + * that provide a more readable abstraction. If we wanted to support more
> > + * workers than that, the contention on the single queue would surely get
> > too
> > + * high, so we might want to consider multiple pools instead of widening
> > this.
> > + */
> > +typedef uint64 PgAioWorkerSet;
>
> > +#define PGAIO_WORKER_SET_BITS (sizeof(PgAioWorkerSet) * CHAR_BIT)
> > +
> > +static_assert(PGAIO_WORKER_SET_BITS >= MAX_IO_WORKERS, "too small");
> > +
> > typedef struct PgAioWorkerControl
> > {
> > - uint64 idle_worker_mask;
> > + /* Seen by postmaster */
> > + volatile bool grow;
>
> What's that volatile intending to do here? It avoids the needs for some
> compiler barriers, but it's not clear to me those would be needed here anyway.
> And it doesn't imply memory ordering, which I'm not sure is entirely wise
> here. I'd probably just plop a full memory barrier in the few relevant
> places, easier to reason about that way, and it can't matter given the
> infrequency of access. I'd say we should just use a proper atomic, but right
> now I don't think we do that in postmaster.
Changed to full memory barrier.
> > + /* Protected by AioWorkerSubmissionQueueLock. */
> > + PgAioWorkerSet idle_worker_set;
> > +
> > + /* Protected by AioWorkerControlLock. */
> > + PgAioWorkerSet worker_set;
> > + int nworkers;
> > +
> > + /* Protected by AioWorkerControlLock. */
> > PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
> > } PgAioWorkerControl;
> >
> > @@ -91,15 +116,103 @@ const IoMethodOps pgaio_worker_ops = {
> >
> >
> > +static bool
> > +pgaio_worker_set_is_empty(PgAioWorkerSet *set)
> > +{
> > + return *set == 0;
> > +}
> > +
> > +static PgAioWorkerSet
> > +pgaio_worker_set_singleton(int worker)
> > +{
> > + return UINT64_C(1) << worker;
> > +}
>
> I guess an assert about `worker` being small enough wouldn't hurt.
Done.
> > +static void
> > +pgaio_worker_set_fill(PgAioWorkerSet *set)
> > +{
> > + *set = UINT64_MAX >> (PGAIO_WORKER_SET_BITS - MAX_IO_WORKERS);
> > +}
>
> What does "_fill" really mean? Just that all valid bits are set? Why wouldn't
> it be _all() or _full()?
I guess I got that from sigset_t... Trying pgaio_workerset_all().
> > +static int
> > +pgaio_worker_set_get_highest(PgAioWorkerSet *set)
> > +{
> > + Assert(!pgaio_worker_set_is_empty(set));
> > + return pg_leftmost_one_pos64(*set);
> > +}
>
> "worker_set_get*" reads quite awkwardly. Maybe just going for
> pgaio_workerset_* would help?
>
> Or maybe just name it PgAioWset/pgaio_wset_ or such?
OK let's try "workerset".
> > +static void
> > +pgaio_worker_grow(bool grow)
> > +{
> > + /*
> > + * This is called from sites that don't hold AioWorkerControlLock, but
> > + * these values change infrequently and an up-to-date value is not
> > + * required for this heuristic purpose.
> > + */
>
> Is it actually useful to do this while not holding the control lock? Ah, I
> see, this is due to the split of submission and control lock.
Yeah actually that comment is just confusing. Removed. It's pretty
clear that this flag has the usual sort of postmaster request flag
semantics and tolerates a bit of fuzziness.
> > + if (!grow)
> > + {
> > + /* Avoid dirtying memory if not already set. */
> > + if (io_worker_control->grow)
> > + io_worker_control->grow = false;
>
> Hm. pgaio_worker_grow(grow=false) is a bit odd. And this is basically a copy
> of pgaio_worker_cancel_grow() - I realize that's intended for postmaster, but
> somehow it's a bit odd.
Hmm, right.
> Maybe just name it pgaio_worker_set_grow()?
OK how about:
pgaio_worker_request_grow()
pgaio_worker_cancel_grow()
> > +/*
> > + * Called by the postmaster to check if a new worker is needed.
> > + */
> > +bool
> > +pgaio_worker_test_grow(void)
> > +{
> > + return io_worker_control && io_worker_control->grow;
> > +}
> > +
> > +/*
> > + * Called by the postmaster to clear the grow flag.
> > + */
> > +void
> > +pgaio_worker_clear_grow(void)
> > +{
> > + if (io_worker_control)
> > + io_worker_control->grow = false;
> > +}
>
> Maybe we should add _pm_ in there to make it clearer that they're not for
> general use?
Done.
> > @@ -226,8 +413,7 @@ pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle
> > **staged_ios)
> > {
> > PgAioHandle **synchronous_ios = NULL;
> > int nsync = 0;
> > - Latch *wakeup = NULL;
> > - int worker;
> > + int worker = -1;
> >
> > Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
> >
> > @@ -252,19 +438,15 @@ pgaio_worker_submit(uint16 num_staged_ios,
> > PgAioHandle **staged_ios)
> > break;
> > }
> >
> > - if (wakeup == NULL)
> > - {
> > - /* Choose an idle worker to wake up if we
> > haven't already. */
> > - worker = pgaio_worker_choose_idle();
> > - if (worker >= 0)
> > - wakeup =
> > io_worker_control->workers[worker].latch;
> > -
> > - pgaio_debug_io(DEBUG4, staged_ios[i],
> > - "choosing worker
> > %d",
> > - worker);
> > - }
> > + /* Choose one worker to wake for this batch. */
> > + if (worker == -1)
> > + worker = pgaio_worker_choose_idle(0);
> > }
>
> If we only want to do this once per "batch", why not just do it outside the
> num_staged_ios loop?
Two steps: pgaio_worker_choose_idle() must be done while holding the
queue lock (will probably finish up revising this in future work on
removing locks...). pgaio_worker_wake() is called outside the loop,
after releasing the lock.
> > @@ -295,14 +474,27 @@ pgaio_worker_submit(uint16 num_staged_ios,
> > PgAioHandle **staged_ios)
> > static void
> > pgaio_worker_die(int code, Datum arg)
> > {
> > - LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
> > - Assert(io_worker_control->workers[MyIoWorkerId].in_use);
> > - Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch);
> > + PgAioWorkerSet notify_set;
> >
> > - io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
> > - io_worker_control->workers[MyIoWorkerId].in_use = false;
> > - io_worker_control->workers[MyIoWorkerId].latch = NULL;
> > + LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
> > + pgaio_worker_set_remove(&io_worker_control->idle_worker_set,
> > MyIoWorkerId);
> > LWLockRelease(AioWorkerSubmissionQueueLock);
> > +
> > + LWLockAcquire(AioWorkerControlLock, LW_EXCLUSIVE);
> > + Assert(io_worker_control->workers[MyIoWorkerId].proc_number ==
> > MyProcNumber);
> > + io_worker_control->workers[MyIoWorkerId].proc_number =
> > INVALID_PROC_NUMBER;
> > + Assert(pgaio_worker_set_contains(&io_worker_control->worker_set,
> > MyIoWorkerId));
> > + pgaio_worker_set_remove(&io_worker_control->worker_set, MyIoWorkerId);
> > + notify_set = io_worker_control->worker_set;
> > + Assert(io_worker_control->nworkers > 0);
> > + io_worker_control->nworkers--;
> > + Assert(pgaio_worker_set_count(&io_worker_control->worker_set) ==
> > + io_worker_control->nworkers);
> > + LWLockRelease(AioWorkerControlLock);
> > +
> > + /* Notify other workers on pool change. */
>
> Why are we notifying them on pool changes?
Comments added to explain. It closes a wakeup-loss race (imagine if
you consumed a wakeup while you were exiting due to timeout; noone
else would wake up, which I fixed with this big hammer).
> > + while (!pgaio_worker_set_is_empty(¬ify_set))
> > + pgaio_worker_wake(pgaio_worker_set_pop_lowest(¬ify_set));
>
> I did already wonder further up if pgaio_worker_wake() should just receive a
> worker_set as an argument.
I have added pgaio_workerset_wake().
> > @@ -312,33 +504,34 @@ pgaio_worker_die(int code, Datum arg)
> > static void
> > pgaio_worker_register(void)
> > {
> > + PgAioWorkerSet free_worker_set;
> > + PgAioWorkerSet old_worker_set;
> > +
> > MyIoWorkerId = -1;
> >
> > - /*
> > - * XXX: This could do with more fine-grained locking. But it's also
> > not
> > - * very common for the number of workers to change at the moment...
> > - */
> > - LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
> > + LWLockAcquire(AioWorkerControlLock, LW_EXCLUSIVE);
>
> I guess it could be useful to assert that nworkers is small enough before
> doing anything.
OK.
> > + pgaio_worker_set_fill(&free_worker_set);
> > + pgaio_worker_set_subtract(&free_worker_set,
> > &io_worker_control->worker_set);
> > + if (!pgaio_worker_set_is_empty(&free_worker_set))
> > + MyIoWorkerId = pgaio_worker_set_get_lowest(&free_worker_set);
> > + if (MyIoWorkerId == -1)
> > + elog(ERROR, "couldn't find a free worker ID");
>
> I'd probably add a comment saying "/* find lowest unused worker ID */" or
> such, that was more immediately obvious in the old code.
Done.
> > +/*
> > + * Check if this backend is allowed to time out, and thus should use a
> > + * non-infinite sleep time. Only the highest-numbered worker is allowed to
> > + * time out, and only if the pool is above io_min_workers. Serializing
> > + * timeouts keeps IDs in a range 0..N without gaps, and avoids
> > undershooting
> > + * io_min_workers.
>
> But it's ok if a lower numbered worker errors out, right? There will be a
> temporary gap, but we will start a new worker for it?
Yes it is OK for there to be gaps.
If any worker errors out, it will be replaced when reaped if we fell
below io_min_workers, and otherwise replaced via the usual means, ie
once the backlog detection and the launch delay allow it. I did have
a version that always replaced *every* worker with exit code 1
immediately, but I started wondering if we really want persistent
errors to turn into high speed fork() loops. I'm still not sure TBH.
We don't expect workers to error out, so it means something is already
pretty screwed up and you might appreciate the rate limiting?
I have an always-replace patch somewhere, as I've vacillated on that
point a couple of times. I will post a separate fixup for
consideration.
> Does that happen even
> if there's a shrink of the set of required workers at the same time as a lower
> numbered worker errors out?
If a workers errors out (exit code 1) and an idle worker timed out
(exit code 0), then it's no different: if the new count dropped below
io_min_workers, we start a worker immediate after reaping the process.
Othewise we let the normal algorithm decide to start a new worker
if/when required.
> > @@ -439,10 +666,9 @@ IoWorkerMain(const void *startup_data, size_t
> > startup_data_len)
> > while (!ShutdownRequestPending)
> > {
> > uint32 io_index;
> > - Latch *latches[IO_WORKER_WAKEUP_FANOUT];
> > - int nlatches = 0;
> > - int nwakeups = 0;
> > - int worker;
> > + int worker = -1;
> > + int queue_depth = 0;
> > + bool grow = false;
> >
> > /*
> > * Try to get a job to do.
> > @@ -453,38 +679,64 @@ IoWorkerMain(const void *startup_data, size_t
> > startup_data_len)
> > LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
> > if ((io_index = pgaio_worker_submission_queue_consume()) ==
> > -1)
> > {
> > - /*
> > - * Nothing to do. Mark self idle.
> > - *
> > - * XXX: Invent some kind of back pressure to reduce
> > useless
> > - * wakeups?
> > - */
> > - io_worker_control->idle_worker_mask |= (UINT64_C(1)
> > << MyIoWorkerId);
> > + /* Nothing to do. Mark self idle. */
> > +
> > pgaio_worker_set_insert(&io_worker_control->idle_worker_set,
> > +
> > MyIoWorkerId);
> > }
> > else
> > {
> > /* Got one. Clear idle flag. */
> > - io_worker_control->idle_worker_mask &= ~(UINT64_C(1)
> > << MyIoWorkerId);
> > +
> > pgaio_worker_set_remove(&io_worker_control->idle_worker_set,
> > +
> > MyIoWorkerId);
>
> Wonder if we should keep track of whether we marked ourselves idle to avoid
> needing to do that. But that would be a separate optimization really.
Fair point. OK.
> > + /*
> > + * See if we should wake up a higher numbered peer.
> > Only do that
> > + * if this worker is not receiving spurious wakeups
> > itself.
>
> The "not receiving spurious wakeups" condition is wakeups < ios?
Yes, see new comment near PGAIO_WORKER_WAKEUP_RATIO_SATURATE.
> I think both 'wakeups" and "ios" are a bit too generically named. Based on the
> names I have no idea what this heuristic might be.
I have struggled to name them. Does wakeup_count and io_count help?
> > + * This heuristic tries to discover the useful wakeup
> > propagation
> > + * chain length when IOs are very fast and workers
> > wake up to find
> > + * that all IOs have already been taken.
> > + *
> > + * If we chose not to wake a worker when we ideally
> > should have,
> > + * the ratio will soon be corrected.
> > + */
> > + if (wakeups <= ios)
> > {
> > + queue_depth =
> > pgaio_worker_submission_queue_depth();
> > + if (queue_depth > 0)
> > + {
> > + worker =
> > pgaio_worker_choose_idle(MyIoWorkerId + 1);
>
> Is it a problem that we are passing an ID that's potentially bigger than the
> biggest legal worker ID? It's probably fine as long as MAX_WORKERS is 32 and
> the bitmap is a 64bit integer, but ...
Oof. Fixed.
> > + /*
> > + * If there were no idle higher
> > numbered peers and there
> > + * are more than enough IOs queued
> > for me and all lower
> > + * numbered peers, then try to start
> > a new worker.
> > + */
> > + if (worker == -1 && queue_depth >
> > MyIoWorkerId)
> > + grow = true;
> > + }
>
> We probably shouldn't request growth when already at the cap? That could
> generate a *lot* of pmsignal traffic, I think?
No, we only set it if it isn't already set (like a latch), and only
send a pmsignal when we set it (like a latch), and the postmaster only
clears it if it can start a worker (unlike a latch). That applies in
general, not just when we hit the cap of io_max_workers: while the
postmaster is waiting for launch interval to expire, it will leave the
flag set, suppressed for 100ms or whatever, and the in the special
case of io_max_workers, for as long as the count remains that high.
> I don't have an immediate intuitive understanding of why the submission queue
> depth is a good measure here.
>
> If there are 10 workers that are busy 100% of the time, and the submission
> queue is usually 6 deep with not-being-worked-on IOs, why do we not want to
> start more workers?
>
> It actually seems to work - but I don't actually understand why.
I should have made it clearer that that's a secondary condition. The
primary condition is: a worker wanted to wake another worker, but
found that none were idle. Unfortunately the whole system is a bit
too asynchronous for that to be a reliable cue on its own. So, I also
check if the queue appears to be (1) obviously growing: that's clearly
too long and must be introducing latency, or (2) varying "too much".
Which I detect in exactly the same way.
Imagine a histogram that look like this:
LOG: depth 00: 7898
LOG: depth 01: 1630
LOG: depth 02: 308
LOG: depth 03: 93
LOG: depth 04: 40
LOG: depth 05: 19
LOG: depth 06: 6
LOG: depth 07: 4
LOG: depth 08: 0
LOG: depth 09: 1
LOG: depth 10: 1
LOG: depth 11: 0
LOG: depth 12: 0
LOG: depth 13: 0
If you're failing to find idle workers to wake up AND our managic
threshold is hit by something in that long tail, then it'll call for
backup. Of course I'm totally sidestepping a lot of queueing theory
maths and just saying "I'd better be able to find an idle worker when
I want to" and if not, "there had better not be any outliers that
reach this far".
I've written a longer explanation in a long comment. Including a
little challenge for someone to do better with real science and maths.
I hope it's a bit clearer at least.
> ninja install-test-files
> io_max_workers=32
> debug_io_direct=data
> effective_io_concurrency=16
> shared_buffers=5GB
>
> pgbench -i -q -s 100 --fillfactor=30
>
> CREATE EXTENSION IF NOT EXISTS test_aio;
> CREATE EXTENSION IF NOT EXISTS pg_buffercache;
> DROP TABLE IF EXISTS pattern_random_pgbench;
> CREATE TABLE pattern_random_pgbench AS SELECT ARRAY(SELECT random(0,
> pg_relation_size('pgbench_accounts')/8192 - 1)::int4 FROM generate_series(1,
> pg_relation_size('pgbench_accounts')/8192)) AS pattern;
>
> My test is:
>
> SET effective_io_concurrency = 20;
> SELECT pg_buffercache_evict_relation('pgbench_accounts');
> SELECT read_stream_for_blocks('pgbench_accounts', pattern) FROM
> pattern_random_pgbench LIMIT 1;
>
>
> We end up with ~24-28 workers, even though we never have more than 20 IOs in
> flight. Not entirely sure why. I guess it's just that after doing an IO the
> worker needs to mark itself idle etc?
Yep. It would be nice to make it a bit more accurate in later cycles.
It tends to overprovision rather than under, since it thinks all other
workers are busy. That information is a bit racy. In this version
I've made a small improvement: it uses nworkers directly, under the
big new comment, instead of an unnecessarily complicated
approximation.
> > if (io_index != -1)
> > {
> > PgAioHandle *ioh = NULL;
> >
> > + /* Cancel timeout and update wakeup:work ratio. */
> > + idle_timeout_abs = 0;
> > + if (++ios == PGAIO_WORKER_STATS_MAX)
> > + {
> > + wakeups /= 2;
> > + ios /= 2;
> > + }
>
>
> /* Saturation for counters used to estimate wakeup:work ratio. */
> #define PGAIO_WORKER_STATS_MAX 4
>
> STATS_MAX sounds like it's just about some reporting or such.
I have renamed it to PGAIO_WORKER_RATIO_MAX and written a big comment
at the top to explain what it's for.io
> > ioh = &pgaio_ctl->io_handles[io_index];
> > error_ioh = ioh;
> > errcallback.arg = ioh;
> > @@ -537,6 +789,14 @@ IoWorkerMain(const void *startup_data, size_t
> > startup_data_len)
> > }
> > #endif
> >
> > +#ifdef PGAIO_WORKER_SHOW_PS_INFO
> > + sprintf(cmd, "%d: [%s] %s",
> > + MyIoWorkerId,
> > + pgaio_io_get_op_name(ioh),
> > + pgaio_io_get_target_description(ioh));
> > + set_ps_display(cmd);
> > +#endif
>
> Note that this leaks memory. See the target_description comment:
>
> /*
> * Return a stringified description of the IO's target.
> *
> * The string is localized and allocated in the current memory context.
> */
Fixed.
> > /*
> > * We don't expect this to ever fail with ERROR or
> > FATAL, no need
> > * to keep error_ioh set to the IO.
> > @@ -550,8 +810,75 @@ IoWorkerMain(const void *startup_data, size_t
> > startup_data_len)
> > }
> > else
> > {
> > - WaitLatch(MyLatch, WL_LATCH_SET |
> > WL_EXIT_ON_PM_DEATH, -1,
> > - WAIT_EVENT_IO_WORKER_MAIN);
> > + int timeout_ms;
> > +
> > + /* Cancel new worker request if pending. */
> > + pgaio_worker_grow(false);
>
> That seems to happen very frequently.
Yeah, but it doesn't write to memory after someone else does it. This
again is part of the strategy for preventing excess workers from being
created. If I've found the queue to be empty.
> > + /*
> > + * All workers maintain the absolute timeout
> > value, but only
> > + * the highest worker can actually time out
> > and only if
> > + * io_min_workers is satisfied. All others
> > wait only for
> > + * explicit wakeups caused by queue
> > insertion, wakeup
> > + * propagation, change of pool size (possibly
> > promoting one to
> > + * new highest) or GUC reload.
> > + */
> > + if (pgaio_worker_can_timeout())
> > + timeout_ms =
> > +
> > TimestampDifferenceMilliseconds(now,
> > +
> > idle_timeout_abs);
> > + else
> > + timeout_ms = -1;
>
>
> Hm. This way you get very rapid worker pool reductions. Configured
> io_worker_idle_timeout=1s, started a bunch of work of and observed the worker
> count after the work finishes:
>
> Mon 06 Apr 2026 02:08:28 PM EDT (every 1s)
>
> count
> 32
> (1 row)
> Mon 06 Apr 2026 02:08:29 PM EDT (every 1s)
>
> count
> 32
> (1 row)
> Mon 06 Apr 2026 02:08:30 PM EDT (every 1s)
>
> count
> 1
> (1 row)
> Mon 06 Apr 2026 02:08:31 PM EDT (every 1s)
>
> count
> 1
> (1 row)
>
>
> Of course this is a ridiculuously low setting, but it does seems like starting
> the timeout even when not the highest numbered worker will lead to a lot of
> quick yoyoing.
I have changed it so that after one worker times out, the next one
begins its timeout count from 0. (This is one of the reasons for that
"notify the whole pool when I exit" thing.)
From 537a3df61bf2f2c258f71ea367c27e5550c9092c Mon Sep 17 00:00:00 2001
From: Thomas Munro <[email protected]>
Date: Sat, 22 Mar 2025 00:36:49 +1300
Subject: [PATCH v5] aio: Adjust I/O worker pool size automatically.
The size of the I/O worker pool used to implement io_method=worker was
previously controlled by the io_workers setting, defaulting to 3. It
was hard to know how to tune it effectively. It is now replaced with:
io_min_workers=2
io_max_workers=8 (up to 32)
io_worker_idle_timeout=60s
io_worker_launch_interval=100ms
The pool is automatically sized within the configured range according to
recent variation in demand. It grows when existing workers detect a
backlog, and shrinks when the highest numbered worker is idle for too
long. Work was already concentrated into low-numbered workers in
anticipation of this logic.
The logic for waking extra workers now also tries to measure and reduce
the number of spurious wakeups, though they are not entirely eliminated.
Reviewed-by: Dmitry Dolgov <[email protected]>
Reviewed-by: Andres Freund <[email protected]>
Discussion: https://postgr.es/m/CA%2BhUKG%2Bm4xV0LMoH2c%3DoRAdEXuCnh%2BtGBTWa7uFeFMGgTLAw%2BQ%40mail.gmail.com
---
doc/src/sgml/config.sgml | 69 +-
src/backend/postmaster/postmaster.c | 166 +++--
src/backend/storage/aio/method_worker.c | 592 +++++++++++++++---
.../utils/activity/wait_event_names.txt | 1 +
src/backend/utils/misc/guc_parameters.dat | 34 +-
src/backend/utils/misc/postgresql.conf.sample | 6 +-
src/include/storage/io_worker.h | 10 +-
src/include/storage/lwlocklist.h | 1 +
src/include/storage/pmsignal.h | 1 +
src/test/modules/test_aio/t/002_io_workers.pl | 15 +-
src/tools/pgindent/typedefs.list | 1 +
11 files changed, 751 insertions(+), 145 deletions(-)
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 3324d2d3c49..86899c4be68 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2870,16 +2870,75 @@ include_dir 'conf.d'
</listitem>
</varlistentry>
- <varlistentry id="guc-io-workers" xreflabel="io_workers">
- <term><varname>io_workers</varname> (<type>integer</type>)
+ <varlistentry id="guc-io-min-workers" xreflabel="io_min_workers">
+ <term><varname>io_min_workers</varname> (<type>integer</type>)
<indexterm>
- <primary><varname>io_workers</varname> configuration parameter</primary>
+ <primary><varname>io_min_workers</varname> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
- Selects the number of I/O worker processes to use. The default is
- 3. This parameter can only be set in the
+ Sets the minimum number of I/O worker processes. The default is
+ 2. This parameter can only be set in the
+ <filename>postgresql.conf</filename> file or on the server command
+ line.
+ </para>
+ <para>
+ Only has an effect if <xref linkend="guc-io-method"/> is set to
+ <literal>worker</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry id="guc-io-max-workers" xreflabel="io_max_workers">
+ <term><varname>io_max_workers</varname> (<type>int</type>)
+ <indexterm>
+ <primary><varname>io_max_workers</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Sets the maximum number of I/O worker processes. The default is
+ 8. This parameter can only be set in the
+ <filename>postgresql.conf</filename> file or on the server command
+ line.
+ </para>
+ <para>
+ Only has an effect if <xref linkend="guc-io-method"/> is set to
+ <literal>worker</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry id="guc-io-worker-idle-timeout" xreflabel="io_worker_idle_timeout">
+ <term><varname>io_worker_idle_timeout</varname> (<type>int</type>)
+ <indexterm>
+ <primary><varname>io_worker_idle_timeout</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Sets the time after which entirely idle I/O worker processes exit, reducing the
+ size of pool to match demand. The default is 1 minute. This
+ parameter can only be set in the
+ <filename>postgresql.conf</filename> file or on the server command
+ line.
+ </para>
+ <para>
+ Only has an effect if <xref linkend="guc-io-method"/> is set to
+ <literal>worker</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry id="guc-io-worker-launch-interval" xreflabel="io_worker_launch_interval">
+ <term><varname>io_worker_launch_interval</varname> (<type>int</type>)
+ <indexterm>
+ <primary><varname>io_worker_launch_interval</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Sets the minimum time before another I/O worker can be launched. This avoids
+ creating too many for an unsustained burst of activity. The default is 100ms.
+ This parameter can only be set in the
<filename>postgresql.conf</filename> file or on the server command
line.
</para>
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 6f13e8f40a0..cb2ccd9900c 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -409,6 +409,7 @@ static DNSServiceRef bonjour_sdref = NULL;
#endif
/* State for IO worker management. */
+static TimestampTz io_worker_launch_next_time = 0;
static int io_worker_count = 0;
static PMChild *io_worker_children[MAX_IO_WORKERS];
@@ -447,7 +448,8 @@ static int CountChildren(BackendTypeMask targetMask);
static void LaunchMissingBackgroundProcesses(void);
static void maybe_start_bgworkers(void);
static bool maybe_reap_io_worker(int pid);
-static void maybe_adjust_io_workers(void);
+static void maybe_start_io_workers(void);
+static TimestampTz maybe_start_io_workers_scheduled_at(void);
static bool CreateOptsFile(int argc, char *argv[], char *fullprogname);
static PMChild *StartChildProcess(BackendType type);
static void StartSysLogger(void);
@@ -1391,7 +1393,7 @@ PostmasterMain(int argc, char *argv[])
UpdatePMState(PM_STARTUP);
/* Make sure we can perform I/O while starting up. */
- maybe_adjust_io_workers();
+ maybe_start_io_workers();
/* Start bgwriter and checkpointer so they can help with recovery */
if (CheckpointerPMChild == NULL)
@@ -1555,14 +1557,15 @@ checkControlFile(void)
static int
DetermineSleepTime(void)
{
- TimestampTz next_wakeup = 0;
+ TimestampTz next_wakeup;
/*
- * Normal case: either there are no background workers at all, or we're in
- * a shutdown sequence (during which we ignore bgworkers altogether).
+ * If in ImmediateShutdown with a SIGKILL timeout, ignore everything else
+ * and wait for that.
+ *
+ * XXX Shouldn't this also test FatalError?
*/
- if (Shutdown > NoShutdown ||
- (!StartWorkerNeeded && !HaveCrashedWorker))
+ if (Shutdown >= ImmediateShutdown)
{
if (AbortStartTime != 0)
{
@@ -1582,14 +1585,16 @@ DetermineSleepTime(void)
return seconds * 1000;
}
- else
- return 60 * 1000;
}
- if (StartWorkerNeeded)
+ /* Time of next maybe_start_io_workers() call, or 0 for none. */
+ next_wakeup = maybe_start_io_workers_scheduled_at();
+
+ /* Ignore bgworkers during shutdown. */
+ if (StartWorkerNeeded && Shutdown == NoShutdown)
return 0;
- if (HaveCrashedWorker)
+ if (HaveCrashedWorker && Shutdown == NoShutdown)
{
dlist_mutable_iter iter;
@@ -2542,7 +2547,17 @@ process_pm_child_exit(void)
if (!EXIT_STATUS_0(exitstatus) && !EXIT_STATUS_1(exitstatus))
HandleChildCrash(pid, exitstatus, _("io worker"));
- maybe_adjust_io_workers();
+ /*
+ * A worker that exited with an error might have brought the pool
+ * size below io_min_workers, or allowed the queue to grow to the
+ * point where another worker called for growth.
+ *
+ * In the common case that a worker timed out due to idleness, no
+ * replacement needs to be started. maybe_start_io_workers() will
+ * figure that out.
+ */
+ maybe_start_io_workers();
+
continue;
}
@@ -3262,7 +3277,7 @@ PostmasterStateMachine(void)
UpdatePMState(PM_STARTUP);
/* Make sure we can perform I/O while starting up. */
- maybe_adjust_io_workers();
+ maybe_start_io_workers();
StartupPMChild = StartChildProcess(B_STARTUP);
Assert(StartupPMChild != NULL);
@@ -3336,7 +3351,7 @@ LaunchMissingBackgroundProcesses(void)
* A config file change will always lead to this function being called, so
* we always will process the config change in a timely manner.
*/
- maybe_adjust_io_workers();
+ maybe_start_io_workers();
/*
* The checkpointer and the background writer are active from the start,
@@ -3797,6 +3812,16 @@ process_pm_pmsignal(void)
StartWorkerNeeded = true;
}
+ /* Process IO worker start requests. */
+ if (CheckPostmasterSignal(PMSIGNAL_IO_WORKER_GROW))
+ {
+ /*
+ * No local flag, as the state is exposed through pgaio_worker_*()
+ * functions. This signal is received on potentially actionable level
+ * changes, so that maybe_start_io_workers() will run.
+ */
+ }
+
/* Process background worker state changes. */
if (CheckPostmasterSignal(PMSIGNAL_BACKGROUND_WORKER_CHANGE))
{
@@ -4399,44 +4424,104 @@ maybe_reap_io_worker(int pid)
}
/*
- * Start or stop IO workers, to close the gap between the number of running
- * workers and the number of configured workers. Used to respond to change of
- * the io_workers GUC (by increasing and decreasing the number of workers), as
- * well as workers terminating in response to errors (by starting
- * "replacement" workers).
+ * Returns the next time at which maybe_start_io_workers() would start one or
+ * more I/O workers. Any time in the past means ASAP, and 0 means no worker
+ * is currently scheduled.
+ *
+ * This is called by DetermineSleepTime() and also maybe_start_io_workers()
+ * itself, to make sure that they agree.
*/
-static void
-maybe_adjust_io_workers(void)
+static TimestampTz
+maybe_start_io_workers_scheduled_at(void)
{
if (!pgaio_workers_enabled())
- return;
+ return 0;
/*
* If we're in final shutting down state, then we're just waiting for all
* processes to exit.
*/
if (pmState >= PM_WAIT_IO_WORKERS)
- return;
+ return 0;
/* Don't start new workers during an immediate shutdown either. */
if (Shutdown >= ImmediateShutdown)
- return;
+ return 0;
/*
* Don't start new workers if we're in the shutdown phase of a crash
* restart. But we *do* need to start if we're already starting up again.
*/
if (FatalError && pmState >= PM_STOP_BACKENDS)
- return;
+ return 0;
+
+ /*
+ * Don't start a worker if we're at or above the maximum. (Excess workers
+ * exit when the GUC is lowered, but the count can be temporarily too high
+ * until they are reaped.)
+ */
+ if (io_worker_count >= io_max_workers)
+ return 0;
+
+ /* If we're under the minimum, start a worker as soon as possible. */
+ if (io_worker_count < io_min_workers)
+ return TIMESTAMP_MINUS_INFINITY; /* start worker ASAP */
+
+ /* Only proceed if a "grow" request is pending from existing workers. */
+ if (!pgaio_worker_pm_test_grow())
+ return 0;
- Assert(pmState < PM_WAIT_IO_WORKERS);
+ /*
+ * maybe_start_io_workers() should start a new I/O worker after this time,
+ * or as soon as possible if is already in the past.
+ */
+ return io_worker_launch_next_time;
+}
+
+/*
+ * Start I/O workers if required. Used at startup, to respond to change of
+ * the io_min_workers GUC, when asked to start a new one due to submission
+ * queue backlog, and after workers terminate in response to errors (by
+ * starting "replacement" workers).
+ */
+static void
+maybe_start_io_workers(void)
+{
+ TimestampTz scheduled_at;
- /* Not enough running? */
- while (io_worker_count < io_workers)
+ while ((scheduled_at = maybe_start_io_workers_scheduled_at()) != 0)
{
+ TimestampTz now = GetCurrentTimestamp();
PMChild *child;
int i;
+ Assert(pmState < PM_WAIT_IO_WORKERS);
+
+ /* Still waiting for the scheduled time? */
+ if (scheduled_at > now)
+ break;
+
+ /* Clear the grow request flag if it is set. */
+ pgaio_worker_pm_clear_grow();
+
+ /*
+ * Compute next launch time relative to the previous value, so that
+ * time spent on the postmaster's other duties don't result in an
+ * inaccurate launch interval.
+ */
+ io_worker_launch_next_time =
+ TimestampTzPlusMilliseconds(io_worker_launch_next_time,
+ io_worker_launch_interval);
+
+ /*
+ * If that's already in the past, the interval is either impossibly
+ * short or we received no requests for new workers for a period.
+ * Compute a new future time relative to the last launch time instead.
+ */
+ if (io_worker_launch_next_time <= now)
+ io_worker_launch_next_time =
+ TimestampTzPlusMilliseconds(now, io_worker_launch_interval);
+
/* find unused entry in io_worker_children array */
for (i = 0; i < MAX_IO_WORKERS; ++i)
{
@@ -4454,22 +4539,21 @@ maybe_adjust_io_workers(void)
++io_worker_count;
}
else
- break; /* try again next time */
- }
-
- /* Too many running? */
- if (io_worker_count > io_workers)
- {
- /* ask the IO worker in the highest slot to exit */
- for (int i = MAX_IO_WORKERS - 1; i >= 0; --i)
{
- if (io_worker_children[i] != NULL)
- {
- kill(io_worker_children[i]->pid, SIGUSR2);
- break;
- }
+ /*
+ * Fork failure: we'll try again after the launch interval
+ * expires, or be called again without delay if we don't yet have
+ * io_min_workers. Don't loop here though, the postmaster has
+ * other duties.
+ */
+ break;
}
}
+
+ /*
+ * Workers decide when to shut down by themselves, according to the
+ * io_max_workers and io_worker_idle_timeout GUCs.
+ */
}
diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c
index eb686cede1a..fb7dca253c7 100644
--- a/src/backend/storage/aio/method_worker.c
+++ b/src/backend/storage/aio/method_worker.c
@@ -11,9 +11,8 @@
* infrastructure for reopening the file, and must processed synchronously by
* the client code when submitted.
*
- * So that the submitter can make just one system call when submitting a batch
- * of IOs, wakeups "fan out"; each woken IO worker can wake two more. XXX This
- * could be improved by using futexes instead of latches to wake N waiters.
+ * The pool tries to stabilize at a size that can handle recently seen
+ * variation in demand, within the configured limits.
*
* This method of AIO is available in all builds on all operating systems, and
* is the default.
@@ -29,6 +28,8 @@
#include "postgres.h"
+#include <limits.h>
+
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "port/pg_bitutils.h"
@@ -40,6 +41,8 @@
#include "storage/io_worker.h"
#include "storage/ipc.h"
#include "storage/latch.h"
+#include "storage/lwlock.h"
+#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/shmem.h"
#include "tcop/tcopprot.h"
@@ -48,10 +51,22 @@
#include "utils/ps_status.h"
#include "utils/wait_event.h"
+/*
+ * Saturation for counters used to estimate wakeup:IO ratio.
+ *
+ * We maintain wakeup_count for wakeups received and io_count for IOs
+ * processed by each worker. When either counter reaches this saturation
+ * value, we divide both by two. The result is an exponentially decaying
+ * ratio of wakeups to IOs, with a very short memory.
+ *
+ * If a worker is itself experiencing useless wakeups, it assumes that
+ * higher-numbered workers would experience even more, so it should end the
+ * chain.
+ */
+#define PGAIO_WORKER_WAKEUP_RATIO_SATURATE 4
-/* How many workers should each worker wake up if needed? */
-#define IO_WORKER_WAKEUP_FANOUT 2
-
+/* Debugging support: show current IO and wakeups:ios statistics in ps. */
+/* #define PGAIO_WORKER_SHOW_PS_INFO */
typedef struct PgAioWorkerSubmissionQueue
{
@@ -63,13 +78,34 @@ typedef struct PgAioWorkerSubmissionQueue
typedef struct PgAioWorkerSlot
{
- Latch *latch;
- bool in_use;
+ ProcNumber proc_number;
} PgAioWorkerSlot;
+/*
+ * Sets of worker IDs are held in a simple bitmap, accessed through functions
+ * that provide a more readable abstraction. If we wanted to support more
+ * workers than that, the contention on the single queue would surely get too
+ * high, so we might want to consider multiple pools instead of widening this.
+ */
+typedef uint64 PgAioWorkerSet;
+
+#define PGAIO_WORKERSET_BITS (sizeof(PgAioWorkerSet) * CHAR_BIT)
+
+static_assert(PGAIO_WORKERSET_BITS >= MAX_IO_WORKERS, "too small");
+
typedef struct PgAioWorkerControl
{
- uint64 idle_worker_mask;
+ /* Seen by postmaster */
+ bool grow;
+
+ /* Protected by AioWorkerSubmissionQueueLock. */
+ PgAioWorkerSet idle_workerset;
+
+ /* Protected by AioWorkerControlLock. */
+ PgAioWorkerSet workerset;
+ int nworkers;
+
+ /* Protected by AioWorkerControlLock. */
PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
} PgAioWorkerControl;
@@ -91,15 +127,108 @@ const IoMethodOps pgaio_worker_ops = {
/* GUCs */
-int io_workers = 3;
+int io_min_workers = 2;
+int io_max_workers = 8;
+int io_worker_idle_timeout = 60000;
+int io_worker_launch_interval = 100;
static int io_worker_queue_size = 64;
-static int MyIoWorkerId;
+static int MyIoWorkerId = -1;
static PgAioWorkerSubmissionQueue *io_worker_submission_queue;
static PgAioWorkerControl *io_worker_control;
+static void
+pgaio_workerset_initialize(PgAioWorkerSet *set)
+{
+ *set = 0;
+}
+
+static bool
+pgaio_workerset_is_empty(PgAioWorkerSet *set)
+{
+ return *set == 0;
+}
+
+static PgAioWorkerSet
+pgaio_workerset_singleton(int worker)
+{
+ Assert(worker >= 0 && worker < MAX_IO_WORKERS);
+ return UINT64_C(1) << worker;
+}
+
+static void
+pgaio_workerset_all(PgAioWorkerSet *set)
+{
+ *set = UINT64_MAX >> (PGAIO_WORKERSET_BITS - MAX_IO_WORKERS);
+}
+
+static void
+pgaio_workerset_subtract(PgAioWorkerSet *set1, const PgAioWorkerSet *set2)
+{
+ *set1 &= ~*set2;
+}
+
+static void
+pgaio_workerset_insert(PgAioWorkerSet *set, int worker)
+{
+ Assert(worker >= 0 && worker < MAX_IO_WORKERS);
+ *set |= pgaio_workerset_singleton(worker);
+}
+
+static void
+pgaio_workerset_remove(PgAioWorkerSet *set, int worker)
+{
+ Assert(worker >= 0 && worker < MAX_IO_WORKERS);
+ *set &= ~pgaio_workerset_singleton(worker);
+}
+
+static void
+pgaio_workerset_remove_lte(PgAioWorkerSet *set, int worker)
+{
+ Assert(worker >= 0 && worker < MAX_IO_WORKERS);
+ *set &= (~(PgAioWorkerSet) 0) << (worker + 1);
+}
+
+static int
+pgaio_workerset_get_highest(PgAioWorkerSet *set)
+{
+ Assert(!pgaio_workerset_is_empty(set));
+ return pg_leftmost_one_pos64(*set);
+}
+
+static int
+pgaio_workerset_get_lowest(PgAioWorkerSet *set)
+{
+ Assert(!pgaio_workerset_is_empty(set));
+ return pg_rightmost_one_pos64(*set);
+}
+
+static int
+pgaio_workerset_pop_lowest(PgAioWorkerSet *set)
+{
+ int worker = pgaio_workerset_get_lowest(set);
+
+ pgaio_workerset_remove(set, worker);
+ return worker;
+}
+
+#ifdef USE_ASSERT_CHECKING
+static bool
+pgaio_workerset_contains(PgAioWorkerSet *set, int worker)
+{
+ Assert(worker >= 0 && worker < MAX_IO_WORKERS);
+ return (*set & pgaio_workerset_singleton(worker)) != 0;
+}
+
+static int
+pgaio_workerset_count(PgAioWorkerSet *set)
+{
+ return pg_popcount64(*set);
+}
+#endif
+
static void
pgaio_worker_shmem_request(void *arg)
{
@@ -133,37 +262,123 @@ pgaio_worker_shmem_init(void *arg)
io_worker_submission_queue->size = queue_size;
io_worker_submission_queue->head = 0;
io_worker_submission_queue->tail = 0;
+ io_worker_control->grow = false;
+ pgaio_workerset_initialize(&io_worker_control->workerset);
+ pgaio_workerset_initialize(&io_worker_control->idle_workerset);
- io_worker_control->idle_worker_mask = 0;
for (int i = 0; i < MAX_IO_WORKERS; ++i)
+ io_worker_control->workers[i].proc_number = INVALID_PROC_NUMBER;
+}
+
+/*
+ * Tell postmaster that we think a new worker is needed.
+ */
+static void
+pgaio_worker_request_grow(void)
+{
+ if (!io_worker_control->grow)
+ {
+ io_worker_control->grow = true;
+ pg_memory_barrier();
+ SendPostmasterSignal(PMSIGNAL_IO_WORKER_GROW);
+ }
+}
+
+/*
+ * Cancel any request for a new worker, after observing an empty queue.
+ */
+static void
+pgaio_worker_cancel_grow(void)
+{
+ if (io_worker_control->grow)
{
- io_worker_control->workers[i].latch = NULL;
- io_worker_control->workers[i].in_use = false;
+ io_worker_control->grow = false;
+ pg_memory_barrier();
}
}
+/*
+ * Called by the postmaster to check if a new worker is requested.
+ */
+bool
+pgaio_worker_pm_test_grow(void)
+{
+ pg_memory_barrier();
+ return io_worker_control && io_worker_control->grow;
+}
+
+/*
+ * Called by the postmaster to clear the request for a new worker.
+ */
+void
+pgaio_worker_pm_clear_grow(void)
+{
+ if (io_worker_control)
+ io_worker_control->grow = false;
+ pg_memory_barrier();
+}
+
static int
-pgaio_worker_choose_idle(void)
+pgaio_worker_choose_idle(int only_workers_above)
{
+ PgAioWorkerSet workerset;
int worker;
- if (io_worker_control->idle_worker_mask == 0)
+ Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
+
+ workerset = io_worker_control->idle_workerset;
+ if (only_workers_above >= 0)
+ pgaio_workerset_remove_lte(&workerset, only_workers_above);
+ if (pgaio_workerset_is_empty(&workerset))
return -1;
- /* Find the lowest bit position, and clear it. */
- worker = pg_rightmost_one_pos64(io_worker_control->idle_worker_mask);
- io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker);
- Assert(io_worker_control->workers[worker].in_use);
+ /* Find the lowest numbered idle worker and mark it not idle. */
+ worker = pgaio_workerset_get_lowest(&workerset);
+ pgaio_workerset_remove(&io_worker_control->idle_workerset, worker);
return worker;
}
+/*
+ * Try to wake a worker by setting its latch, to tell it there are IOs to
+ * process in the submission queue.
+ */
+static void
+pgaio_worker_wake(int worker)
+{
+ ProcNumber proc_number;
+
+ /*
+ * If the selected worker is concurrently exiting, then pgaio_worker_die()
+ * had not yet removed it as of when we saw it in idle_workerset. That's
+ * OK, because it will wake all remaining workers to close wakeup-vs-exit
+ * races: *someone* will see the queued IO. If there are no workers
+ * running, the postmaster will start a new one.
+ */
+ proc_number = io_worker_control->workers[worker].proc_number;
+ if (proc_number != INVALID_PROC_NUMBER)
+ SetLatch(&GetPGProcByNumber(proc_number)->procLatch);
+}
+
+/*
+ * Try to wake a set of workers. Used on pool change, to close races
+ * described in the callers.
+ */
+static void
+pgaio_workerset_wake(PgAioWorkerSet workerset)
+{
+ while (!pgaio_workerset_is_empty(&workerset))
+ pgaio_worker_wake(pgaio_workerset_pop_lowest(&workerset));
+}
+
static bool
pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
{
PgAioWorkerSubmissionQueue *queue;
uint32 new_head;
+ Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
+
queue = io_worker_submission_queue;
new_head = (queue->head + 1) & (queue->size - 1);
if (new_head == queue->tail)
@@ -185,6 +400,8 @@ pgaio_worker_submission_queue_consume(void)
PgAioWorkerSubmissionQueue *queue;
int result;
+ Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
+
queue = io_worker_submission_queue;
if (queue->tail == queue->head)
return -1; /* empty */
@@ -201,6 +418,8 @@ pgaio_worker_submission_queue_depth(void)
uint32 head;
uint32 tail;
+ Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
+
head = io_worker_submission_queue->head;
tail = io_worker_submission_queue->tail;
@@ -226,8 +445,7 @@ pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
{
PgAioHandle **synchronous_ios = NULL;
int nsync = 0;
- Latch *wakeup = NULL;
- int worker;
+ int worker = -1;
Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
@@ -252,19 +470,15 @@ pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
break;
}
- if (wakeup == NULL)
- {
- /* Choose an idle worker to wake up if we haven't already. */
- worker = pgaio_worker_choose_idle();
- if (worker >= 0)
- wakeup = io_worker_control->workers[worker].latch;
-
- pgaio_debug_io(DEBUG4, staged_ios[i],
- "choosing worker %d",
- worker);
- }
+ /* Choose one worker to wake for this batch. */
+ if (worker == -1)
+ worker = pgaio_worker_choose_idle(-1);
}
LWLockRelease(AioWorkerSubmissionQueueLock);
+
+ /* Wake up chosen worker. It will wake peers if necessary. */
+ if (worker != -1)
+ pgaio_worker_wake(worker);
}
else
{
@@ -273,9 +487,6 @@ pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
nsync = num_staged_ios;
}
- if (wakeup)
- SetLatch(wakeup);
-
/* Run whatever is left synchronously. */
if (nsync > 0)
{
@@ -295,14 +506,30 @@ pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
static void
pgaio_worker_die(int code, Datum arg)
{
- LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
- Assert(io_worker_control->workers[MyIoWorkerId].in_use);
- Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch);
+ PgAioWorkerSet notify_set;
- io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
- io_worker_control->workers[MyIoWorkerId].in_use = false;
- io_worker_control->workers[MyIoWorkerId].latch = NULL;
+ LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+ pgaio_workerset_remove(&io_worker_control->idle_workerset, MyIoWorkerId);
LWLockRelease(AioWorkerSubmissionQueueLock);
+
+ LWLockAcquire(AioWorkerControlLock, LW_EXCLUSIVE);
+ Assert(io_worker_control->workers[MyIoWorkerId].proc_number == MyProcNumber);
+ io_worker_control->workers[MyIoWorkerId].proc_number = INVALID_PROC_NUMBER;
+ Assert(pgaio_workerset_contains(&io_worker_control->workerset, MyIoWorkerId));
+ pgaio_workerset_remove(&io_worker_control->workerset, MyIoWorkerId);
+ notify_set = io_worker_control->workerset;
+ Assert(io_worker_control->nworkers > 0);
+ io_worker_control->nworkers--;
+ Assert(pgaio_workerset_count(&io_worker_control->workerset) ==
+ io_worker_control->nworkers);
+ LWLockRelease(AioWorkerControlLock);
+
+ /*
+ * Notify other workers on pool change. This allows the new highest
+ * worker to know that it is now the one that can time out, and closes a
+ * wakeup-loss race described in pgaio_worker_wake().
+ */
+ pgaio_workerset_wake(notify_set);
}
/*
@@ -312,33 +539,38 @@ pgaio_worker_die(int code, Datum arg)
static void
pgaio_worker_register(void)
{
+ PgAioWorkerSet free_workerset;
+ PgAioWorkerSet old_workerset;
+
MyIoWorkerId = -1;
- /*
- * XXX: This could do with more fine-grained locking. But it's also not
- * very common for the number of workers to change at the moment...
- */
- LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+ LWLockAcquire(AioWorkerControlLock, LW_EXCLUSIVE);
+ /* Find lowest unused worker ID. */
+ pgaio_workerset_all(&free_workerset);
+ pgaio_workerset_subtract(&free_workerset, &io_worker_control->workerset);
+ if (!pgaio_workerset_is_empty(&free_workerset))
+ MyIoWorkerId = pgaio_workerset_get_lowest(&free_workerset);
+ if (MyIoWorkerId == -1)
+ elog(ERROR, "couldn't find a free worker ID");
- for (int i = 0; i < MAX_IO_WORKERS; ++i)
- {
- if (!io_worker_control->workers[i].in_use)
- {
- Assert(io_worker_control->workers[i].latch == NULL);
- io_worker_control->workers[i].in_use = true;
- MyIoWorkerId = i;
- break;
- }
- else
- Assert(io_worker_control->workers[i].latch != NULL);
- }
+ Assert(io_worker_control->workers[MyIoWorkerId].proc_number ==
+ INVALID_PROC_NUMBER);
+ io_worker_control->workers[MyIoWorkerId].proc_number = MyProcNumber;
- if (MyIoWorkerId == -1)
- elog(ERROR, "couldn't find a free worker slot");
+ old_workerset = io_worker_control->workerset;
+ Assert(!pgaio_workerset_contains(&old_workerset, MyIoWorkerId));
+ pgaio_workerset_insert(&io_worker_control->workerset, MyIoWorkerId);
+ io_worker_control->nworkers++;
+ Assert(io_worker_control->nworkers <= MAX_IO_WORKERS);
+ Assert(pgaio_workerset_count(&io_worker_control->workerset) ==
+ io_worker_control->nworkers);
+ LWLockRelease(AioWorkerControlLock);
- io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
- io_worker_control->workers[MyIoWorkerId].latch = MyLatch;
- LWLockRelease(AioWorkerSubmissionQueueLock);
+ /*
+ * Notify other workers on pool change. If we were the highest worker,
+ * this allows the new highest worker to know that it can time out.
+ */
+ pgaio_workerset_wake(old_workerset);
on_shmem_exit(pgaio_worker_die, 0);
}
@@ -364,14 +596,48 @@ pgaio_worker_error_callback(void *arg)
errcontext("I/O worker executing I/O on behalf of process %d", owner_pid);
}
+/*
+ * Check if this backend is allowed to time out, and thus should use a
+ * non-infinite sleep time. Only the highest-numbered worker is allowed to
+ * time out, and only if the pool is above io_min_workers. Serializing
+ * timeouts keeps IDs in a range 0..N without gaps, and avoids undershooting
+ * io_min_workers.
+ *
+ * The result is only instantaneously true and may be temporarily inconsistent
+ * in different workers around transitions, but all workers are woken up on
+ * pool size or GUC changes making the result eventually consistent.
+ */
+static bool
+pgaio_worker_can_timeout(void)
+{
+ PgAioWorkerSet workerset;
+
+ /* Serialize against pool size changes. */
+ LWLockAcquire(AioWorkerControlLock, LW_SHARED);
+ workerset = io_worker_control->workerset;
+ LWLockRelease(AioWorkerControlLock);
+
+ if (MyIoWorkerId != pgaio_workerset_get_highest(&workerset))
+ return false;
+
+ if (MyIoWorkerId < io_min_workers)
+ return false;
+
+ return true;
+}
+
void
IoWorkerMain(const void *startup_data, size_t startup_data_len)
{
sigjmp_buf local_sigjmp_buf;
+ TimestampTz idle_timeout_abs = 0;
+ int timeout_guc_used = 0;
PgAioHandle *volatile error_ioh = NULL;
ErrorContextCallback errcallback = {0};
volatile int error_errno = 0;
char cmd[128];
+ int io_count = 0;
+ int wakeup_count = 0;
AuxiliaryProcessMainCommon();
@@ -439,10 +705,9 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
while (!ShutdownRequestPending)
{
uint32 io_index;
- Latch *latches[IO_WORKER_WAKEUP_FANOUT];
- int nlatches = 0;
- int nwakeups = 0;
- int worker;
+ int worker = -1;
+ int queue_depth = 0;
+ bool maybe_grow = false;
/*
* Try to get a job to do.
@@ -453,38 +718,106 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
if ((io_index = pgaio_worker_submission_queue_consume()) == -1)
{
- /*
- * Nothing to do. Mark self idle.
- *
- * XXX: Invent some kind of back pressure to reduce useless
- * wakeups?
- */
- io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
+ /* Nothing to do. Mark self idle. */
+ pgaio_workerset_insert(&io_worker_control->idle_workerset,
+ MyIoWorkerId);
}
else
{
/* Got one. Clear idle flag. */
- io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
+ pgaio_workerset_remove(&io_worker_control->idle_workerset,
+ MyIoWorkerId);
- /* See if we can wake up some peers. */
- nwakeups = Min(pgaio_worker_submission_queue_depth(),
- IO_WORKER_WAKEUP_FANOUT);
- for (int i = 0; i < nwakeups; ++i)
+ /*
+ * See if we should wake up a higher numbered peer. Only do that
+ * if this worker is not receiving spurious wakeups itself. The
+ * intention is create a frontier beyond which idle workers stay
+ * asleep.
+ *
+ * This heuristic tries to discover the useful wakeup propagation
+ * chain length when IOs are very fast and workers wake up to find
+ * that all IOs have already been taken.
+ *
+ * If we chose not to wake a worker when we ideally should have,
+ * then ios will soon exceed wakeups.
+ */
+ if (wakeup_count <= io_count)
{
- if ((worker = pgaio_worker_choose_idle()) < 0)
- break;
- latches[nlatches++] = io_worker_control->workers[worker].latch;
+ queue_depth = pgaio_worker_submission_queue_depth();
+ if (queue_depth > 0)
+ {
+ /* Choose a worker higher than me to wake. */
+ worker = pgaio_worker_choose_idle(MyIoWorkerId);
+ if (worker == -1)
+ maybe_grow = true;
+ }
}
}
LWLockRelease(AioWorkerSubmissionQueueLock);
- for (int i = 0; i < nlatches; ++i)
- SetLatch(latches[i]);
+ /* Propagate wakeups. */
+ if (worker != -1)
+ {
+ pgaio_worker_wake(worker);
+ }
+ else if (maybe_grow)
+ {
+ /*
+ * We know there was at least one more item in the queue, and we
+ * failed to find a higher-numbered idle worker to wake. Now we
+ * decide if we should try to start one more worker.
+ *
+ * We do this with a simple heuristic: is the queue depth greater
+ * than the current number of workers?
+ *
+ * Consider the following situations:
+ *
+ * 1. The queue depth is constantly increasing, because IOs are
+ * arriving faster than they can possibly be serviced. It doesn't
+ * matter much which threshold we choose, as we will surely hit
+ * it. Crossing the current worker count is a useful signal
+ * because it's clearly too deep to avoid queuing latency already,
+ * but still leaves a small window of opportunity to improve the
+ * situation before the queue oveflows.
+ *
+ * 2. The worker pool is keeping up, no latency is being
+ * introduced and an extra worker would be a waste of resources.
+ * Queue depth distributions tend to be heavily skewed, with long
+ * tails of low probability spikes (due to submission clustering,
+ * scheduling, jitter, stalls, noisy neighbors, etc). We want a
+ * number that is very unlikely to be triggered by an outlier, and
+ * we bet that an exponential or similar distribution whose
+ * outliers never reach this threshold must be almost entirely
+ * concentrated at the low end. If we do see a spike as big as
+ * the worker count, we take it as a signal that the distribution
+ * is surely too wide.
+ *
+ * On its own, this is an extremely crude signal. When combined
+ * with the wakeup propagation test that precedes it and the
+ * io_worker_launch_delay, we can try each pool size until we find
+ * one that doesn't trigger further growth.
+ *
+ * XXX Ideas from queueing theory or control theory could surely
+ * do a much better job of this.
+ */
+
+ /* Read nworkers without lock for this heuristic purpose. */
+ if (queue_depth > io_worker_control->nworkers)
+ pgaio_worker_request_grow();
+ }
if (io_index != -1)
{
PgAioHandle *ioh = NULL;
+ /* Cancel timeout and update wakeup:work ratio. */
+ idle_timeout_abs = 0;
+ if (++io_count == PGAIO_WORKER_WAKEUP_RATIO_SATURATE)
+ {
+ wakeup_count /= 2;
+ io_count /= 2;
+ }
+
ioh = &pgaio_ctl->io_handles[io_index];
error_ioh = ioh;
errcallback.arg = ioh;
@@ -537,6 +870,19 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
}
#endif
+#ifdef PGAIO_WORKER_SHOW_PS_INFO
+ {
+ char *description = pgaio_io_get_target_description(ioh);
+
+ sprintf(cmd, "%d: [%s] %s",
+ MyIoWorkerId,
+ pgaio_io_get_op_name(ioh),
+ pgaio_io_get_target_description(ioh));
+ pfree(description);
+ set_ps_display(cmd);
+ }
+#endif
+
/*
* We don't expect this to ever fail with ERROR or FATAL, no need
* to keep error_ioh set to the IO.
@@ -550,8 +896,76 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
}
else
{
- WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
- WAIT_EVENT_IO_WORKER_MAIN);
+ int timeout_ms;
+
+ /* Cancel new worker request if pending. */
+ pgaio_worker_cancel_grow();
+
+ /* Compute the remaining allowed idle time. */
+ if (io_worker_idle_timeout == -1)
+ {
+ /* Never time out. */
+ timeout_ms = -1;
+ }
+ else
+ {
+ TimestampTz now = GetCurrentTimestamp();
+
+ /* If the GUC changes, reset timer. */
+ if (idle_timeout_abs != 0 &&
+ io_worker_idle_timeout != timeout_guc_used)
+ idle_timeout_abs = 0;
+
+ /* Only the highest-numbered worker can time out. */
+ if (pgaio_worker_can_timeout())
+ {
+ if (idle_timeout_abs == 0)
+ {
+ /*
+ * I have just been promoted to the timeout worker, or
+ * the GUC changed. Compute new absolute time from
+ * now.
+ */
+ idle_timeout_abs =
+ TimestampTzPlusMilliseconds(now,
+ io_worker_idle_timeout);
+ timeout_guc_used = io_worker_idle_timeout;
+ }
+ timeout_ms =
+ TimestampDifferenceMilliseconds(now, idle_timeout_abs);
+ }
+ else
+ {
+ /* No timeout for me. */
+ idle_timeout_abs = 0;
+ timeout_ms = -1;
+ }
+ }
+
+#ifdef PGAIO_WORKER_SHOW_PS_INFO
+ sprintf(cmd, "%d: idle, wakeups:ios = %d:%d",
+ MyIoWorkerId, wakeup_count, io_count);
+ set_ps_display(cmd);
+#endif
+
+ if (WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT,
+ timeout_ms,
+ WAIT_EVENT_IO_WORKER_MAIN) == WL_TIMEOUT)
+ {
+ /* WL_TIMEOUT */
+ if (pgaio_worker_can_timeout())
+ if (GetCurrentTimestamp() >= idle_timeout_abs)
+ break;
+ }
+ else
+ {
+ /* WL_LATCH_SET */
+ if (++wakeup_count == PGAIO_WORKER_WAKEUP_RATIO_SATURATE)
+ {
+ wakeup_count /= 2;
+ io_count /= 2;
+ }
+ }
ResetLatch(MyLatch);
}
@@ -561,6 +975,10 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
+
+ /* If io_max_workers has been decreased, exit highest first. */
+ if (MyIoWorkerId >= io_max_workers)
+ break;
}
}
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 7bda5298558..560659f9568 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -369,6 +369,7 @@ AioWorkerSubmissionQueue "Waiting to access AIO worker submission queue."
WaitLSN "Waiting to read or update shared Wait-for-LSN state."
LogicalDecodingControl "Waiting to read or update logical decoding status information."
DataChecksumsWorker "Waiting for data checksums worker."
+AioWorkerControl "Waiting to update AIO worker information."
#
# END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE)
diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat
index fcb6ab80583..584ff79d0ba 100644
--- a/src/backend/utils/misc/guc_parameters.dat
+++ b/src/backend/utils/misc/guc_parameters.dat
@@ -1390,6 +1390,14 @@
check_hook => 'check_io_max_concurrency',
},
+{ name => 'io_max_workers', type => 'int', context => 'PGC_SIGHUP', group => 'RESOURCES_IO',
+ short_desc => 'Maximum number of I/O worker processes, for io_method=worker.',
+ variable => 'io_max_workers',
+ boot_val => '8',
+ min => '1',
+ max => 'MAX_IO_WORKERS',
+},
+
{ name => 'io_method', type => 'enum', context => 'PGC_POSTMASTER', group => 'RESOURCES_IO',
short_desc => 'Selects the method for executing asynchronous I/O.',
variable => 'io_method',
@@ -1398,14 +1406,32 @@
assign_hook => 'assign_io_method',
},
-{ name => 'io_workers', type => 'int', context => 'PGC_SIGHUP', group => 'RESOURCES_IO',
- short_desc => 'Number of IO worker processes, for io_method=worker.',
- variable => 'io_workers',
- boot_val => '3',
+{ name => 'io_min_workers', type => 'int', context => 'PGC_SIGHUP', group => 'RESOURCES_IO',
+ short_desc => 'Minimum number of I/O worker processes, for io_method=worker.',
+ variable => 'io_min_workers',
+ boot_val => '2',
min => '1',
max => 'MAX_IO_WORKERS',
},
+{ name => 'io_worker_idle_timeout', type => 'int', context => 'PGC_SIGHUP', group => 'RESOURCES_IO',
+ short_desc => 'Maximum time before idle I/O worker processes time out, for io_method=worker.',
+ variable => 'io_worker_idle_timeout',
+ flags => 'GUC_UNIT_MS',
+ boot_val => '60000',
+ min => '0',
+ max => 'INT_MAX',
+},
+
+{ name => 'io_worker_launch_interval', type => 'int', context => 'PGC_SIGHUP', group => 'RESOURCES_IO',
+ short_desc => 'Minimum time before launching a new I/O worker process, for io_method=worker.',
+ variable => 'io_worker_launch_interval',
+ flags => 'GUC_UNIT_MS',
+ boot_val => '100',
+ min => '0',
+ max => 'INT_MAX',
+},
+
# Not for general use --- used by SET SESSION AUTHORIZATION and SET
# ROLE
{ name => 'is_superuser', type => 'bool', context => 'PGC_INTERNAL', group => 'UNGROUPED',
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index e3e462f3efb..e28599f478e 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -218,7 +218,11 @@
# can execute simultaneously
# -1 sets based on shared_buffers
# (change requires restart)
-#io_workers = 3 # 1-32;
+
+#io_min_workers = 2 # 1-32 (change requires pg_reload_conf())
+#io_max_workers = 8 # 1-32
+#io_worker_idle_timeout = 60s
+#io_worker_launch_interval = 100ms
# - Worker Processes -
diff --git a/src/include/storage/io_worker.h b/src/include/storage/io_worker.h
index f7d5998a138..cffffd62fdd 100644
--- a/src/include/storage/io_worker.h
+++ b/src/include/storage/io_worker.h
@@ -17,6 +17,14 @@
pg_noreturn extern void IoWorkerMain(const void *startup_data, size_t startup_data_len);
-extern PGDLLIMPORT int io_workers;
+/* Public GUCs. */
+extern PGDLLIMPORT int io_min_workers;
+extern PGDLLIMPORT int io_max_workers;
+extern PGDLLIMPORT int io_worker_idle_timeout;
+extern PGDLLIMPORT int io_worker_launch_interval;
+
+/* Interfaces visible to the postmaster. */
+extern bool pgaio_worker_pm_test_grow(void);
+extern void pgaio_worker_pm_clear_grow(void);
#endif /* IO_WORKER_H */
diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h
index af8553bcb6c..d7eb648bd27 100644
--- a/src/include/storage/lwlocklist.h
+++ b/src/include/storage/lwlocklist.h
@@ -88,6 +88,7 @@ PG_LWLOCK(53, AioWorkerSubmissionQueue)
PG_LWLOCK(54, WaitLSN)
PG_LWLOCK(55, LogicalDecodingControl)
PG_LWLOCK(56, DataChecksumsWorker)
+PG_LWLOCK(57, AioWorkerControl)
/*
* There also exist several built-in LWLock tranches. As with the predefined
diff --git a/src/include/storage/pmsignal.h b/src/include/storage/pmsignal.h
index 001e6eea61c..bcce4011790 100644
--- a/src/include/storage/pmsignal.h
+++ b/src/include/storage/pmsignal.h
@@ -38,6 +38,7 @@ typedef enum
PMSIGNAL_ROTATE_LOGFILE, /* send SIGUSR1 to syslogger to rotate logfile */
PMSIGNAL_START_AUTOVAC_LAUNCHER, /* start an autovacuum launcher */
PMSIGNAL_START_AUTOVAC_WORKER, /* start an autovacuum worker */
+ PMSIGNAL_IO_WORKER_GROW, /* I/O worker pool wants to grow */
PMSIGNAL_BACKGROUND_WORKER_CHANGE, /* background worker state change */
PMSIGNAL_START_WALRECEIVER, /* start a walreceiver */
PMSIGNAL_ADVANCE_STATE_MACHINE, /* advance postmaster's state machine */
diff --git a/src/test/modules/test_aio/t/002_io_workers.pl b/src/test/modules/test_aio/t/002_io_workers.pl
index 34bc132ea08..b9775811d4d 100644
--- a/src/test/modules/test_aio/t/002_io_workers.pl
+++ b/src/test/modules/test_aio/t/002_io_workers.pl
@@ -14,6 +14,9 @@ $node->init();
$node->append_conf(
'postgresql.conf', qq(
io_method=worker
+io_worker_idle_timeout=0ms
+io_worker_launch_interval=0ms
+io_max_workers=32
));
$node->start();
@@ -31,7 +34,7 @@ sub test_number_of_io_workers_dynamic
{
my $node = shift;
- my $prev_worker_count = $node->safe_psql('postgres', 'SHOW io_workers');
+ my $prev_worker_count = $node->safe_psql('postgres', 'SHOW io_min_workers');
# Verify that worker count can't be set to 0
change_number_of_io_workers($node, 0, $prev_worker_count, 1);
@@ -62,24 +65,24 @@ sub change_number_of_io_workers
my ($result, $stdout, $stderr);
($result, $stdout, $stderr) =
- $node->psql('postgres', "ALTER SYSTEM SET io_workers = $worker_count");
+ $node->psql('postgres', "ALTER SYSTEM SET io_min_workers = $worker_count");
$node->safe_psql('postgres', 'SELECT pg_reload_conf()');
if ($expect_failure)
{
like(
$stderr,
- qr/$worker_count is outside the valid range for parameter "io_workers"/,
- "updating number of io_workers to $worker_count failed, as expected"
+ qr/$worker_count is outside the valid range for parameter "io_min_workers"/,
+ "updating io_min_workers to $worker_count failed, as expected"
);
return $prev_worker_count;
}
else
{
- is( $node->safe_psql('postgres', 'SHOW io_workers'),
+ is( $node->safe_psql('postgres', 'SHOW io_min_workers'),
$worker_count,
- "updating number of io_workers from $prev_worker_count to $worker_count"
+ "updating number of io_min_workers from $prev_worker_count to $worker_count"
);
check_io_worker_count($node, $worker_count);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 9e6a39f5608..e411fe55254 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2270,6 +2270,7 @@ PgAioUringCaps
PgAioUringContext
PgAioWaitRef
PgAioWorkerControl
+PgAioWorkerSet
PgAioWorkerSlot
PgAioWorkerSubmissionQueue
PgArchData
--
2.53.0