On Wed, Jul 30, 2025 at 10:15 PM Dmitry Dolgov <[email protected]> wrote:
> As a side note, I was trying to experiment with this patch using
> dm-mapper's delay feature to introduce an arbitrary large io latency and
> see how the io queue is growing.
FWIW, here's what I came up with while experimenting with that sort of thing:
shared_preload_libraries=io_limit
io_limit.ios_per_second=6000
That differs from eg dm-mapper delays by making everything seem like
slow direct I/O, which seemed more interesting for this project. For
example if you run some continuous workload while you SET
io_limit.ios_per_second to various numbers, with
io_workers_idle_timeout set fairly low, you can monitor the pool
adjustments.
From 6ecfe2226c9068a82b7c54094db55354960a70bb Mon Sep 17 00:00:00 2001
From: Thomas Munro <[email protected]>
Date: Sat, 11 Apr 2026 17:31:13 +1200
Subject: [PATCH] contrib/io_limit: Simulation of slow storage.
Only affects IOs submitted to io_method=worker. Configured as:
shared_preload_libraries=io_limit
io_limit.ios_per_second=1000
io_limit.read_per_second=200MB
io_limit_write_per_second=100MB
Zero means no limit.
XXX Experimental hack
---
contrib/Makefile | 1 +
contrib/io_limit/Makefile | 20 ++
contrib/io_limit/io_limit.c | 275 ++++++++++++++++++++++++
contrib/io_limit/io_limit.control | 5 +
contrib/io_limit/meson.build | 28 +++
contrib/meson.build | 1 +
src/backend/storage/aio/method_worker.c | 13 ++
src/include/storage/io_worker.h | 5 +
8 files changed, 348 insertions(+)
create mode 100644 contrib/io_limit/Makefile
create mode 100644 contrib/io_limit/io_limit.c
create mode 100644 contrib/io_limit/io_limit.control
create mode 100644 contrib/io_limit/meson.build
diff --git a/contrib/Makefile b/contrib/Makefile
index 7d91fe77db3..48e82c53333 100644
--- a/contrib/Makefile
+++ b/contrib/Makefile
@@ -24,6 +24,7 @@ SUBDIRS = \
hstore \
intagg \
intarray \
+ io_limit \
isn \
lo \
ltree \
diff --git a/contrib/io_limit/Makefile b/contrib/io_limit/Makefile
new file mode 100644
index 00000000000..da176698a17
--- /dev/null
+++ b/contrib/io_limit/Makefile
@@ -0,0 +1,20 @@
+# contrib/io_limit/Makefile
+
+MODULE_big = io_limit
+OBJS = \
+ $(WIN32RES) \
+ io_limit.o
+
+EXTENSION = io_limit
+PGFILEDESC = "io_limit - io_limit - artificially limit asynchronous I/O for tesing"
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/pg_prewarm
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/io_limit/io_limit.c b/contrib/io_limit/io_limit.c
new file mode 100644
index 00000000000..fa2ec6f1ff2
--- /dev/null
+++ b/contrib/io_limit/io_limit.c
@@ -0,0 +1,275 @@
+#include "postgres.h"
+
+#include "miscadmin.h"
+#include "port/atomics.h"
+#include "portability/instr_time.h"
+#include "storage/aio_internal.h"
+#include "storage/io_worker.h"
+#include "storage/lwlock.h"
+#include "storage/shmem.h"
+#include "utils/guc.h"
+
+/* GUCs. */
+static int io_limit_ios_per_second = 0;
+static int io_limit_read_per_second = 0;
+static int io_limit_write_per_second = 0;
+
+typedef struct io_limit_control_data
+{
+ /* Whether any GUC is set to a non-zero value. */
+ bool enabled;
+
+ /* Absolute time to wait until. */
+ pg_atomic_uint64 op_next_ns;
+ pg_atomic_uint64 read_next_ns;
+ pg_atomic_uint64 write_next_ns;
+
+ /* Limits expressed as delay intervals. */
+ LWLock lock;
+ int op_ns;
+ int read_block_ns;
+ int write_block_ns;
+} io_limit_control_data;
+
+static io_limit_control_data * io_limit_control;
+
+static void io_limit_shmem_request(void *arg);
+static void io_limit_shmem_init(void *arg);
+
+static void assign_io_limit_ios_per_second(int newval, void *extra);
+static void assign_io_limit_read_per_second(int newval, void *extra);
+static void assign_io_limit_write_per_second(int newval, void *extra);
+static const char *show_io_limit_ios_per_second(void);
+static const char *show_io_limit_read_per_second(void);
+static const char *show_io_limit_write_per_second(void);
+
+static void io_limit_on_perform(PgAioHandle *ioh);
+
+static const ShmemCallbacks io_limit_shmem_callbacks = {
+ .request_fn = io_limit_shmem_request,
+ .init_fn = io_limit_shmem_init,
+};
+
+PG_MODULE_MAGIC_EXT(
+ .name = "io_limit",
+ .version = PG_VERSION
+);
+
+void
+_PG_init(void)
+{
+ /* Bail out if not configured in shared_preload_libraries. */
+ if (!process_shared_preload_libraries_in_progress)
+ return;
+
+ DefineCustomIntVariable("io_limit.ios_per_second",
+ "Limits IOs per second.",
+ "If set to zero, there is no limit.",
+ &io_limit_ios_per_second,
+ 0,
+ 0, INT_MAX,
+ PGC_USERSET,
+ 0,
+ NULL,
+ assign_io_limit_ios_per_second,
+ show_io_limit_ios_per_second);
+ DefineCustomIntVariable("io_limit.read_per_second",
+ "Limits read bandwidth.",
+ "If set to zero, there is no limit.",
+ &io_limit_read_per_second,
+ 0,
+ 0, INT_MAX,
+ PGC_USERSET,
+ GUC_UNIT_BLOCKS,
+ NULL,
+ assign_io_limit_read_per_second,
+ show_io_limit_read_per_second);
+ DefineCustomIntVariable("io_limit.write_per_second",
+ "Limits write bandwidth.",
+ "If set to zero, there is no limit.",
+ &io_limit_write_per_second,
+ 0,
+ 0, INT_MAX,
+ PGC_USERSET,
+ GUC_UNIT_BLOCKS,
+ NULL,
+ assign_io_limit_write_per_second,
+ show_io_limit_write_per_second);
+
+ MarkGUCPrefixReserved("io_limit");
+ RegisterShmemCallbacks(&io_limit_shmem_callbacks);
+ pgaio_worker_set_on_perform_hook(io_limit_on_perform);
+}
+
+static void
+io_limit_shmem_request(void *arg)
+{
+ ShmemRequestStruct(.name = "io_limit",
+ .size = sizeof(io_limit_control_data),
+ .ptr = (void **) &io_limit_control);
+}
+
+static void
+io_limit_shmem_init(void *arg)
+{
+ memset(io_limit_control, 0, sizeof(*io_limit_control));
+ pg_atomic_init_u64(&io_limit_control->op_next_ns, 0);
+ pg_atomic_init_u64(&io_limit_control->read_next_ns, 0);
+ pg_atomic_init_u64(&io_limit_control->write_next_ns, 0);
+ LWLockInitialize(&io_limit_control->lock, LWLockNewTrancheId("io_limit"));
+
+ /* Assign initial values. */
+ assign_io_limit_ios_per_second(io_limit_ios_per_second, NULL);
+ assign_io_limit_read_per_second(io_limit_read_per_second, NULL);
+ assign_io_limit_write_per_second(io_limit_write_per_second, NULL);
+}
+
+static void
+assign_io_limit(int *wait_ns, int per_second)
+{
+ /* Ignore call from _PG_init() before ready. */
+ if (!io_limit_control)
+ return;
+
+ LWLockAcquire(&io_limit_control->lock, LW_EXCLUSIVE);
+ *wait_ns = per_second == 0 ? 0 : NS_PER_S / per_second;
+ io_limit_control->enabled =
+ io_limit_control->op_ns > 0 ||
+ io_limit_control->read_block_ns > 0 ||
+ io_limit_control->write_block_ns > 0;
+ LWLockRelease(&io_limit_control->lock);
+}
+
+static void
+assign_io_limit_ios_per_second(int newval, void *extra)
+{
+ assign_io_limit(&io_limit_control->op_ns, newval);
+}
+
+static void
+assign_io_limit_read_per_second(int newval, void *extra)
+{
+ assign_io_limit(&io_limit_control->read_block_ns, newval);
+}
+
+static void
+assign_io_limit_write_per_second(int newval, void *extra)
+{
+ assign_io_limit(&io_limit_control->write_block_ns, newval);
+}
+
+static const char *
+show_io_limit(const int *wait_ns)
+{
+ int per_second;
+
+ LWLockAcquire(&io_limit_control->lock, LW_SHARED);
+ per_second = *wait_ns == 0 ? 0 : NS_PER_S / *wait_ns;
+ LWLockRelease(&io_limit_control->lock);
+
+ return psprintf("%d", per_second);
+}
+
+static const char *
+show_io_limit_ios_per_second(void)
+{
+ return show_io_limit(&io_limit_control->op_ns);
+}
+
+static const char *
+show_io_limit_read_per_second(void)
+{
+ return show_io_limit(&io_limit_control->read_block_ns);
+}
+
+static const char *
+show_io_limit_write_per_second(void)
+{
+ return show_io_limit(&io_limit_control->write_block_ns);
+}
+
+static BlockNumber
+io_limit_get_block_count(PgAioHandle *ioh)
+{
+ if (ioh->op == PGAIO_OP_READV ||
+ ioh->op == PGAIO_OP_WRITEV)
+ {
+ struct iovec *iov;
+ size_t size;
+ int iovcnt;
+
+ size = 0;
+ iovcnt = pgaio_io_get_iovec_length(ioh, &iov);
+ for (int i = 0; i < iovcnt; ++i)
+ size += iov[i].iov_len;
+
+ return size / BLCKSZ;
+ }
+
+ return 0;
+}
+
+/*
+ * Wait until *next_ns_p and advance *next_ns_p by delay_ns.
+ */
+static void
+io_limit_wait(pg_atomic_uint64 *next_ns_p, int delay_ns)
+{
+ instr_time now;
+ uint64 now_ns;
+ uint64 next_ns;
+
+ INSTR_TIME_SET_CURRENT(now);
+ now_ns = INSTR_TIME_GET_NANOSEC(now);
+ next_ns = pg_atomic_read_u64(next_ns_p);
+
+ for (;;)
+ {
+ if (next_ns > now_ns)
+ {
+ /* Need to wait. Delay the next op further. */
+ next_ns = pg_atomic_fetch_add_u64(next_ns_p, delay_ns);
+
+ /* Average rate maintained even with low-res sleep or EINTR. */
+ pg_usleep(((next_ns - now_ns) + 999) / 1000);
+ break;
+ }
+ else
+ {
+ /* Don't need to wait. New next_ns is relative to now. */
+ if (pg_atomic_compare_exchange_u64(next_ns_p,
+ &next_ns,
+ now_ns + delay_ns))
+ break;
+ }
+ }
+}
+
+static void
+io_limit_on_perform(PgAioHandle *ioh)
+{
+ int op_ns;
+ int read_block_ns;
+ int write_block_ns;
+
+ if (!io_limit_control->enabled)
+ return;
+
+ op_ns = io_limit_control->op_ns;
+ if (op_ns)
+ io_limit_wait(&io_limit_control->op_next_ns, op_ns);
+
+ if (ioh->op == PGAIO_OP_READV)
+ {
+ read_block_ns = io_limit_control->read_block_ns;
+ if (read_block_ns)
+ io_limit_wait(&io_limit_control->read_next_ns,
+ io_limit_get_block_count(ioh) * read_block_ns);
+ }
+ else if (ioh->op == PGAIO_OP_WRITEV)
+ {
+ write_block_ns = io_limit_control->write_block_ns;
+ io_limit_wait(&io_limit_control->write_next_ns,
+ io_limit_get_block_count(ioh) * write_block_ns);
+ }
+}
diff --git a/contrib/io_limit/io_limit.control b/contrib/io_limit/io_limit.control
new file mode 100644
index 00000000000..2f8f06c9e87
--- /dev/null
+++ b/contrib/io_limit/io_limit.control
@@ -0,0 +1,5 @@
+# io_limit extension
+comment = 'io_limit'
+default_version = '1.0'
+module_pathname = '$libdir/io_limit'
+relocatable = true
diff --git a/contrib/io_limit/meson.build b/contrib/io_limit/meson.build
new file mode 100644
index 00000000000..1d26a08de83
--- /dev/null
+++ b/contrib/io_limit/meson.build
@@ -0,0 +1,28 @@
+# Copyright (c) 2022-2026, PostgreSQL Global Development Group
+
+io_limit_sources = files(
+ 'io_limit.c',
+)
+
+if host_system == 'windows'
+ io_limit_sources += rc_lib_gen.process(win32ver_rc, extra_args: [
+ '--NAME', 'io_limit',
+ '--FILEDESC', 'io_limit - artificially limit asynchronous I/O for tesing',])
+endif
+
+io_limit = shared_module('io_limit',
+ io_limit_sources,
+ kwargs: contrib_mod_args,
+)
+contrib_targets += io_limit
+
+install_data(
+ 'io_limit.control',
+ kwargs: contrib_data_args,
+)
+
+tests += {
+ 'name': 'io_limit',
+ 'sd': meson.current_source_dir(),
+ 'bd': meson.current_build_dir(),
+}
diff --git a/contrib/meson.build b/contrib/meson.build
index ebb7f83d8c5..398b0d704b5 100644
--- a/contrib/meson.build
+++ b/contrib/meson.build
@@ -34,6 +34,7 @@ subdir('hstore_plperl')
subdir('hstore_plpython')
subdir('intagg')
subdir('intarray')
+subdir('io_limit')
subdir('isn')
subdir('jsonb_plperl')
subdir('jsonb_plpython')
diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c
index a5ccd506d8c..87afcf856e1 100644
--- a/src/backend/storage/aio/method_worker.c
+++ b/src/backend/storage/aio/method_worker.c
@@ -139,6 +139,7 @@ static int MyIoWorkerId = -1;
static PgAioWorkerSubmissionQueue *io_worker_submission_queue;
static PgAioWorkerControl *io_worker_control;
+static io_worker_on_perform_fn io_worker_on_perform_hook;
static void
pgaio_workerset_initialize(PgAioWorkerSet *set)
@@ -529,6 +530,9 @@ pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
for (int i = 0; i < nsync; ++i)
{
pgaio_io_perform_synchronously(synchronous_ios[i]);
+
+ if (io_worker_on_perform_hook)
+ io_worker_on_perform_hook(synchronous_ios[i]);
}
}
@@ -929,6 +933,9 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
*/
pgaio_io_perform_synchronously(ioh);
+ if (io_worker_on_perform_hook)
+ io_worker_on_perform_hook(ioh);
+
RESUME_INTERRUPTS();
errcallback.arg = NULL;
}
@@ -1024,6 +1031,12 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
proc_exit(0);
}
+void
+pgaio_worker_set_on_perform_hook(io_worker_on_perform_fn fn)
+{
+ io_worker_on_perform_hook = fn;
+}
+
bool
pgaio_workers_enabled(void)
{
diff --git a/src/include/storage/io_worker.h b/src/include/storage/io_worker.h
index c852c9f3741..c9ef49a585d 100644
--- a/src/include/storage/io_worker.h
+++ b/src/include/storage/io_worker.h
@@ -28,4 +28,9 @@ extern bool pgaio_worker_pm_test_grow_signal_sent(void);
extern void pgaio_worker_pm_clear_grow_signal_sent(void);
extern bool pgaio_worker_pm_test_grow(void);
+/* Hook to support contrib/io_limit. */
+typedef void (*io_worker_on_perform_fn) (PgAioHandle *handle);
+extern void pgaio_worker_set_on_perform_hook(io_worker_on_perform_fn fn);
+
+
#endif /* IO_WORKER_H */
--
2.53.0