Hi,
On 2025-08-14 19:36:49 -0400, Andres Freund wrote:
> On 2025-08-14 17:55:53 -0400, Peter Geoghegan wrote:
> > On Thu, Aug 14, 2025 at 5:06 PM Peter Geoghegan <[email protected]> wrote:
> > > > We can optimize that by deferring the StartBufferIO() if we're
> > > > encountering a
> > > > buffer that is undergoing IO, at the cost of some complexity. I'm not
> > > > sure
> > > > real-world queries will often encounter the pattern of the same block
> > > > being
> > > > read in by a read stream multiple times in close proximity sufficiently
> > > > often
> > > > to make that worth it.
> > >
> > > We definitely need to be prepared for duplicate prefetch requests in
> > > the context of index scans.
> >
> > Can you (or anybody else) think of a quick and dirty way of working
> > around the problem on the read stream side? I would like to prioritize
> > getting the patch into a state where its overall performance profile
> > "feels right". From there we can iterate on fixing the underlying
> > issues in more principled ways.
>
> I think I can see a way to fix the issue, below read stream. Basically,
> whenever AsyncReadBuffers() finds a buffer that has ongoing IO, instead of
> waiting, as we do today, copy the wref to the ReadBuffersOperation() and set a
> new flag indicating that we are waiting for an IO that was not started by the
> wref. Then, in WaitReadBuffers(), we wait for such foreign started IOs. That
> has to be somewhat different code from today, because we have to deal with the
> fact of the "foreign" IO potentially having failed.
>
> I'll try writing a prototype for that tomorrow. I think to actually get that
> into a committable shape we need a test harness (probably a read stream
> controlled by an SQL function that gets an array of buffers).
Attached is a prototype of this approach. It does seem to fix this issue.
New code disabled:
#### backwards sequential table ####
┌──────────────────────────────────────────────────────────────────────┐
│ QUERY PLAN │
├──────────────────────────────────────────────────────────────────────┤
│ Index Scan Backward using t_pk on t (actual rows=1048576.00 loops=1) │
│ Index Cond: ((a >= 16336) AND (a <= 49103)) │
│ Index Searches: 1 │
│ Buffers: shared hit=10291 read=49933 │
│ I/O Timings: shared read=213.277 │
│ Planning: │
│ Buffers: shared hit=91 read=19 │
│ I/O Timings: shared read=2.124 │
│ Planning Time: 3.269 ms │
│ Execution Time: 1023.279 ms │
└──────────────────────────────────────────────────────────────────────┘
(10 rows)
New code enabled:
#### backwards sequential table ####
┌──────────────────────────────────────────────────────────────────────┐
│ QUERY PLAN │
├──────────────────────────────────────────────────────────────────────┤
│ Index Scan Backward using t_pk on t (actual rows=1048576.00 loops=1) │
│ Index Cond: ((a >= 16336) AND (a <= 49103)) │
│ Index Searches: 1 │
│ Buffers: shared hit=10291 read=49933 │
│ I/O Timings: shared read=217.225 │
│ Planning: │
│ Buffers: shared hit=91 read=19 │
│ I/O Timings: shared read=2.009 │
│ Planning Time: 2.685 ms │
│ Execution Time: 602.987 ms │
└──────────────────────────────────────────────────────────────────────┘
(10 rows)
With the change enabled, the sequential query is faster than the random query:
#### backwards random table ####
┌────────────────────────────────────────────────────────────────────────────────────────────┐
│ QUERY PLAN
│
├────────────────────────────────────────────────────────────────────────────────────────────┤
│ Index Scan Backward using t_randomized_pk on t_randomized (actual
rows=1048576.00 loops=1) │
│ Index Cond: ((a >= 16336) AND (a <= 49103))
│
│ Index Searches: 1
│
│ Buffers: shared hit=6085 read=77813
│
│ I/O Timings: shared read=347.285
│
│ Planning:
│
│ Buffers: shared hit=127 read=5
│
│ I/O Timings: shared read=1.001
│
│ Planning Time: 1.751 ms
│
│ Execution Time: 820.544 ms
│
└────────────────────────────────────────────────────────────────────────────────────────────┘
(10 rows)
Greetings,
Andres Freund
>From 433e82c94fd1c1b502a2b22e9c3874c1e766c05c Mon Sep 17 00:00:00 2001
From: Andres Freund <[email protected]>
Date: Fri, 15 Aug 2025 11:01:52 -0400
Subject: [PATCH v1] bufmgr: aio: Prototype for not waiting for
already-in-progress IO
Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
src/include/storage/bufmgr.h | 1 +
src/backend/storage/buffer/bufmgr.c | 133 ++++++++++++++++++++++++++--
2 files changed, 125 insertions(+), 9 deletions(-)
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 41fdc1e7693..7ddb867bc99 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -137,6 +137,7 @@ struct ReadBuffersOperation
int flags;
int16 nblocks;
int16 nblocks_done;
+ bool foreign_io;
PgAioWaitRef io_wref;
PgAioReturn io_return;
};
diff --git a/src/backend/storage/buffer/bufmgr.c
b/src/backend/storage/buffer/bufmgr.c
index fd7e21d96d3..de755fd53ad 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1557,6 +1557,41 @@ ReadBuffersCanStartIOOnce(Buffer buffer, bool nowait)
return StartBufferIO(GetBufferDescriptor(buffer - 1), true,
nowait);
}
+static inline bool
+ReadBuffersIOAlreadyInProgress(ReadBuffersOperation *operation, Buffer buffer)
+{
+ BufferDesc *desc;
+ uint32 buf_state;
+ PgAioWaitRef iow;
+
+ pgaio_wref_clear(&iow);
+
+ if (BufferIsLocal(buffer))
+ {
+ desc = GetLocalBufferDescriptor(-buffer - 1);
+ buf_state = pg_atomic_read_u32(&desc->state);
+ if (buf_state & BM_IO_IN_PROGRESS)
+ iow = desc->io_wref;
+ }
+ else
+ {
+ desc = GetBufferDescriptor(buffer - 1);
+ buf_state = LockBufHdr(desc);
+
+ if (buf_state & BM_IO_IN_PROGRESS)
+ iow = desc->io_wref;
+ UnlockBufHdr(desc, buf_state);
+ }
+
+ if (pgaio_wref_valid(&iow))
+ {
+ operation->io_wref = iow;
+ return true;
+ }
+
+ return false;
+}
+
/*
* Helper for AsyncReadBuffers that tries to get the buffer ready for IO.
*/
@@ -1689,7 +1724,7 @@ WaitReadBuffers(ReadBuffersOperation *operation)
*
* we first check if we already know the IO is complete.
*/
- if (aio_ret->result.status == PGAIO_RS_UNKNOWN &&
+ if ((operation->foreign_io || aio_ret->result.status ==
PGAIO_RS_UNKNOWN) &&
!pgaio_wref_check_done(&operation->io_wref))
{
instr_time io_start =
pgstat_prepare_io_time(track_io_timing);
@@ -1708,11 +1743,38 @@ WaitReadBuffers(ReadBuffersOperation *operation)
Assert(pgaio_wref_check_done(&operation->io_wref));
}
- /*
- * We now are sure the IO completed. Check the results.
This
- * includes reporting on errors if there were any.
- */
- ProcessReadBuffersResult(operation);
+ if (operation->foreign_io)
+ {
+ Buffer buffer =
operation->buffers[operation->nblocks_done];
+ BufferDesc *desc;
+ uint32 buf_state;
+
+ if (BufferIsLocal(buffer))
+ {
+ desc = GetLocalBufferDescriptor(-buffer
- 1);
+ buf_state =
pg_atomic_read_u32(&desc->state);
+ }
+ else
+ {
+ desc = GetBufferDescriptor(buffer - 1);
+ buf_state = LockBufHdr(desc);
+ UnlockBufHdr(desc, buf_state);
+ }
+
+ if (buf_state & BM_VALID)
+ {
+ operation->nblocks_done += 1;
+ Assert(operation->nblocks_done <=
operation->nblocks);
+ }
+ }
+ else
+ {
+ /*
+ * We now are sure the IO completed. Check the
results. This
+ * includes reporting on errors if there were
any.
+ */
+ ProcessReadBuffersResult(operation);
+ }
}
/*
@@ -1798,6 +1860,56 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int
*nblocks_progress)
io_object = IOOBJECT_RELATION;
}
+ /*
+ * If AIO is in progress, be it in this backend or another backend, we
+ * just associate the wait reference with the operation and wait in
+ * WaitReadBuffers(). This turns out to be important for performance in
+ * two workloads:
+ *
+ * 1) A read stream that has to read the same block multiple times
within
+ * the readahead distance. This can happen e.g. for the table
accesses
+ * of an index scan.
+ *
+ * 2) Concurrent scans by multiple backends on the same relation.
+ *
+ * If we were to synchronously wait for the in-progress IO, we'd not be
+ * able to keep enough I/O in flight.
+ *
+ *
+ * If we do find there is ongoing I/O for the buffer, we set up a
1-block
+ * ReadBuffersOperation that WaitReadBuffers then can wait on.
+ */
+ if (1 && ReadBuffersIOAlreadyInProgress(operation,
buffers[nblocks_done]))
+ {
+ /* FIXME: probably need to wait if io_method == sync? */
+
+ *nblocks_progress = 1;
+ did_start_io = true;
+ operation->foreign_io = true;
+
+ if (0)
+ elog(LOG, "using foreign IO path");
+
+ /* FIXME: trace point */
+
+ /*
+ * FIXME: how should this be accounted for in stats? Account as
a hit
+ * for now, quite likely *we* started this IO.
+ */
+ if (persistence == RELPERSISTENCE_TEMP)
+ pgBufferUsage.local_blks_hit += 1;
+ else
+ pgBufferUsage.shared_blks_hit += 1;
+
+ if (operation->rel)
+ pgstat_count_buffer_hit(operation->rel);
+ pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0);
+ if (VacuumCostActive)
+ VacuumCostBalance += VacuumCostPageHit;
+
+ return true;
+ }
+
/*
* If zero_damaged_pages is enabled, add the READ_BUFFERS_ZERO_ON_ERROR
* flag. The reason for that is that, hopefully, zero_damaged_pages
isn't
@@ -1855,9 +1967,9 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int
*nblocks_progress)
/*
* Check if we can start IO on the first to-be-read buffer.
*
- * If an I/O is already in progress in another backend, we want to wait
- * for the outcome: either done, or something went wrong and we will
- * retry.
+ * If a synchronous I/O in progress in another backend (it can't be this
+ * backend), we want to wait for the outcome: either done, or something
+ * went wrong and we will retry.
*/
if (!ReadBuffersCanStartIO(buffers[nblocks_done], false))
{
@@ -1970,6 +2082,7 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int
*nblocks_progress)
if (VacuumCostActive)
VacuumCostBalance += VacuumCostPageMiss *
io_buffers_len;
+ operation->foreign_io = false;
*nblocks_progress = io_buffers_len;
did_start_io = true;
}
@@ -5986,6 +6099,8 @@ WaitIO(BufferDesc *buf)
*/
if (pgaio_wref_valid(&iow))
{
+ if (0)
+ elog(LOG, "foreign wait");
pgaio_wref_wait(&iow);
/*
--
2.48.1.76.g4e746b1a31.dirty