Hi,
In the index prefetching thread we discovered that read stream performance
suffers rather substantially when a read stream is reading blocks multiple
times within the readahead distance.
The problem leading to that is that we are currently synchronously waiting for
IO on a buffer when AsyncReadBuffers() encounters a buffer already undergoing
IO. If a block is read twice, that means we won't actually have enough IOs in
flight to have good performance. What's worse, currently the wait is not
attributed to IO wait (since we're waiting in WaitIO, rather than waiting for
IO).
This does not commonly occur with in-tree users of read streams, as users like
seqscans, bitmap heap scans, vacuum, ... will never try to read the same block
twice. However with index prefetching that is a more common case.
It is possible to encounter a version of this in 18/master: If multiple scans
for the same table are in progress, they can end up waiting synchronously for
each other. However it's a much less severe issue, as the scan that is
"further ahead" will not be blocked.
To fix it, the attached patch has AsyncReadBuffers() check if the "target"
buffer already has IO in progress. If so, it assing the buffer's IO wait
reference to the ReadBuffersOperation. That allows WaitReadBuffers() to wait
for the IO. To make that work correctly, the buffer stats etc have to be
updated in that case in WaitReadBuffers().
I did not feel like I was sufficiently confident in making this work without
tests. However, it's not exactly trivial to test some versions of this, due to
the way multiple processes need to be coordinated. It took way way longer to
write tests than to make the code actually work :/.
The attached tests add a new read_stream_for_blocks() function to test_aio. I
found it also rather useful to reproduce the performance issue without the
index prefetching patch applied. To test the cross process case the injection
point infrastructure in test_aio had to be extended a bit.
Attached are three patches:
0001: Introduces a TestAio package and splits out some existing tests out of
001_aio.pl
0002: Add read_stream test infrastructure & tests
Note that the tests don't test that we don't unnecessarily wait, as
described above, just that IO works correctly.
0003: Improve performance of read stream when re-encountering blocks
To reproduce the issue, the read_stream_for_blocks() function added to
test_aio can be used, in combination with debug_io_direct=data (it's also
possible without DIO, it'd just be more work).
prep:
CREATE EXTENSION test_aio;
CREATE TABLE large AS SELECT i, repeat(random()::text, 5) FROM
generate_series(1, 1000000) g(i);
test:
SELECT pg_buffercache_evict_relation('large');
EXPLAIN ANALYZE SELECT * FROM read_stream_for_blocks('large', ARRAY(SELECT i +
generate_series(0, 3) FROM generate_series(1, 10000) g(i)));
Without 0003 applied:
┌─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ QUERY PLAN
│
├─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
│ Function Scan on read_stream_for_blocks (cost=975.00..985.00 rows=1000
width=12) (actual time=673.647..675.254 rows=40000.00 loops=1) │
│ Buffers: shared hit=29997 read=10003
│
│ I/O Timings: shared read=16.116
│
│ InitPlan 1
│
│ -> Result (cost=0.00..975.00 rows=40000 width=4) (actual
time=0.556..7.575 rows=40000.00 loops=1)
│
│ -> ProjectSet (cost=0.00..375.00 rows=40000 width=8) (actual
time=0.556..4.804 rows=40000.00 loops=1) │
│ -> Function Scan on generate_series g (cost=0.00..100.00
rows=10000 width=4) (actual time=0.554..0.988 rows=10000.00 loops=1) │
│ Planning Time: 0.060 ms
│
│ Execution Time: 676.436 ms
│
└─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
(9 rows)
Time: 676.753 ms
With 0003:
┌─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ QUERY PLAN
│
├─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
│ Function Scan on read_stream_for_blocks (cost=975.00..985.00 rows=1000
width=12) (actual time=87.730..89.453 rows=40000.00 loops=1) │
│ Buffers: shared hit=29997 read=10003
│
│ I/O Timings: shared read=50.467
│
│ InitPlan 1
│
│ -> Result (cost=0.00..975.00 rows=40000 width=4) (actual
time=0.541..7.496 rows=40000.00 loops=1)
│
│ -> ProjectSet (cost=0.00..375.00 rows=40000 width=8) (actual
time=0.540..4.772 rows=40000.00 loops=1) │
│ -> Function Scan on generate_series g (cost=0.00..100.00
rows=10000 width=4) (actual time=0.539..0.965 rows=10000.00 loops=1) │
│ Planning Time: 0.046 ms
│
│ Execution Time: 90.661 ms
│
└─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
(9 rows)
Time: 90.887 ms
Greetings,
Andres Freund
>From ee54c736f08eaaeaeb83a49c12fbdcff1acab681 Mon Sep 17 00:00:00 2001
From: Andres Freund <[email protected]>
Date: Tue, 9 Sep 2025 10:14:34 -0400
Subject: [PATCH v2 1/3] aio: Refactor tests in preparation for more tests
In a future commit more AIO related tests are due to be introduced. However
001_aio.pl already is fairly large.
This commit introduces a new TestAio package with helpers for writing AIO
related tests. Then it uses the new helpers to simplify the existing
001_aio.pl by iterating over all supported io_methods. This will be
particularly helpful because additional methods already have been submitted.
Additionally this commit splits out testing of initdb using a non-default
method into its own test. While that test is somewhat important, it's fairly
slow and doesn't break that often. For development velocity it's helpful for
001_aio.pl to be faster.
While particularly the latter could benefit from being its own commit, it
seems to introduce more back-and-forth than it's worth.
Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
src/test/modules/test_aio/meson.build | 1 +
src/test/modules/test_aio/t/001_aio.pl | 144 +++++++---------------
src/test/modules/test_aio/t/003_initdb.pl | 71 +++++++++++
src/test/modules/test_aio/t/TestAio.pm | 90 ++++++++++++++
4 files changed, 205 insertions(+), 101 deletions(-)
create mode 100644 src/test/modules/test_aio/t/003_initdb.pl
create mode 100644 src/test/modules/test_aio/t/TestAio.pm
diff --git a/src/test/modules/test_aio/meson.build b/src/test/modules/test_aio/meson.build
index 73d2fd68eaa..044149d02b8 100644
--- a/src/test/modules/test_aio/meson.build
+++ b/src/test/modules/test_aio/meson.build
@@ -32,6 +32,7 @@ tests += {
'tests': [
't/001_aio.pl',
't/002_io_workers.pl',
+ 't/003_initdb.pl',
],
},
}
diff --git a/src/test/modules/test_aio/t/001_aio.pl b/src/test/modules/test_aio/t/001_aio.pl
index 3f0453619e8..e7f3358f2d1 100644
--- a/src/test/modules/test_aio/t/001_aio.pl
+++ b/src/test/modules/test_aio/t/001_aio.pl
@@ -7,53 +7,47 @@ use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;
+use FindBin;
+use lib $FindBin::RealBin;
-###
-# Test io_method=worker
-###
-my $node_worker = create_node('worker');
-$node_worker->start();
-
-test_generic('worker', $node_worker);
-SKIP:
-{
- skip 'Injection points not supported by this build', 1
- unless $ENV{enable_injection_points} eq 'yes';
- test_inject_worker('worker', $node_worker);
-}
+use TestAio;
-$node_worker->stop();
+my %nodes;
###
-# Test io_method=io_uring
+# Create and configure one instance for each io_method
###
-if (have_io_uring())
+foreach my $method (TestAio::supported_io_methods())
{
- my $node_uring = create_node('io_uring');
- $node_uring->start();
- test_generic('io_uring', $node_uring);
- $node_uring->stop();
-}
+ my $node = PostgreSQL::Test::Cluster->new($method);
+ $nodes{$method} = $node;
+ $node->init();
+ $node->append_conf('postgresql.conf', "io_method=$method");
+ TestAio::configure($node);
+}
-###
-# Test io_method=sync
-###
-
-my $node_sync = create_node('sync');
-
-# just to have one test not use the default auto-tuning
-
-$node_sync->append_conf(
+# Just to have one test not use the default auto-tuning
+$nodes{'sync'}->append_conf(
'postgresql.conf', qq(
-io_max_concurrency=4
+ io_max_concurrency=4
));
-$node_sync->start();
-test_generic('sync', $node_sync);
-$node_sync->stop();
+
+###
+# Execute the tests for each io_method
+###
+
+foreach my $method (TestAio::supported_io_methods())
+{
+ my $node = $nodes{$method};
+
+ $node->start();
+ test_io_method($method, $node);
+ $node->stop();
+}
done_testing();
@@ -62,71 +56,6 @@ done_testing();
# Test Helpers
###
-sub create_node
-{
- local $Test::Builder::Level = $Test::Builder::Level + 1;
-
- my $io_method = shift;
-
- my $node = PostgreSQL::Test::Cluster->new($io_method);
-
- # Want to test initdb for each IO method, otherwise we could just reuse
- # the cluster.
- #
- # Unfortunately Cluster::init() puts PG_TEST_INITDB_EXTRA_OPTS after the
- # options specified by ->extra, if somebody puts -c io_method=xyz in
- # PG_TEST_INITDB_EXTRA_OPTS it would break this test. Fix that up if we
- # detect it.
- local $ENV{PG_TEST_INITDB_EXTRA_OPTS} = $ENV{PG_TEST_INITDB_EXTRA_OPTS};
- if (defined $ENV{PG_TEST_INITDB_EXTRA_OPTS}
- && $ENV{PG_TEST_INITDB_EXTRA_OPTS} =~ m/io_method=/)
- {
- $ENV{PG_TEST_INITDB_EXTRA_OPTS} .= " -c io_method=$io_method";
- }
-
- $node->init(extra => [ '-c', "io_method=$io_method" ]);
-
- $node->append_conf(
- 'postgresql.conf', qq(
-shared_preload_libraries=test_aio
-log_min_messages = 'DEBUG3'
-log_statement=all
-log_error_verbosity=default
-restart_after_crash=false
-temp_buffers=100
-));
-
- # Even though we used -c io_method=... above, if TEMP_CONFIG sets
- # io_method, it'd override the setting persisted at initdb time. While
- # using (and later verifying) the setting from initdb provides some
- # verification of having used the io_method during initdb, it's probably
- # not worth the complication of only appending if the variable is set in
- # in TEMP_CONFIG.
- $node->append_conf(
- 'postgresql.conf', qq(
-io_method=$io_method
-));
-
- ok(1, "$io_method: initdb");
-
- return $node;
-}
-
-sub have_io_uring
-{
- # To detect if io_uring is supported, we look at the error message for
- # assigning an invalid value to an enum GUC, which lists all the valid
- # options. We need to use -C to deal with running as administrator on
- # windows, the superuser check is omitted if -C is used.
- my ($stdout, $stderr) =
- run_command [qw(postgres -C invalid -c io_method=invalid)];
- die "can't determine supported io_method values"
- unless $stderr =~ m/Available values: ([^\.]+)\./;
- my $methods = $1;
- note "supported io_method values are: $methods";
-
- return ($methods =~ m/io_uring/) ? 1 : 0;
-}
sub psql_like
{
@@ -1490,8 +1419,8 @@ SELECT read_rel_block_ll('tbl_cs_fail', 3, nblocks=>1, zero_on_error=>true);),
}
-# Run all tests that are supported for all io_methods
-sub test_generic
+# Run all tests that for the specified node / io_method
+sub test_io_method
{
my $io_method = shift;
my $node = shift;
@@ -1526,10 +1455,23 @@ CHECKPOINT;
test_ignore_checksum($io_method, $node);
test_checksum_createdb($io_method, $node);
+ # generic injection tests
SKIP:
{
skip 'Injection points not supported by this build', 1
unless $ENV{enable_injection_points} eq 'yes';
test_inject($io_method, $node);
}
+
+ # worker specific injection tests
+ if ($io_method eq 'worker')
+ {
+ SKIP:
+ {
+ skip 'Injection points not supported by this build', 1
+ unless $ENV{enable_injection_points} eq 'yes';
+
+ test_inject_worker($io_method, $node);
+ }
+ }
}
diff --git a/src/test/modules/test_aio/t/003_initdb.pl b/src/test/modules/test_aio/t/003_initdb.pl
new file mode 100644
index 00000000000..c03ae58d00a
--- /dev/null
+++ b/src/test/modules/test_aio/t/003_initdb.pl
@@ -0,0 +1,71 @@
+# Copyright (c) 2024-2025, PostgreSQL Global Development Group
+#
+# Test initdb for each IO method. This is done separately from 001_aio.pl, as
+# it isn't fast. This way the more commonly failing / hacked-on 001_aio.pl can
+# be iterated on more quickly.
+
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+use FindBin;
+use lib $FindBin::RealBin;
+
+use TestAio;
+
+
+foreach my $method (TestAio::supported_io_methods())
+{
+ test_create_node($method);
+}
+
+done_testing();
+
+
+sub test_create_node
+{
+ local $Test::Builder::Level = $Test::Builder::Level + 1;
+
+ my $io_method = shift;
+
+ my $node = PostgreSQL::Test::Cluster->new($io_method);
+
+ # Want to test initdb for each IO method, otherwise we could just reuse
+ # the cluster.
+ #
+ # Unfortunately Cluster::init() puts PG_TEST_INITDB_EXTRA_OPTS after the
+ # options specified by ->extra, if somebody puts -c io_method=xyz in
+ # PG_TEST_INITDB_EXTRA_OPTS it would break this test. Fix that up if we
+ # detect it.
+ local $ENV{PG_TEST_INITDB_EXTRA_OPTS} = $ENV{PG_TEST_INITDB_EXTRA_OPTS};
+ if (defined $ENV{PG_TEST_INITDB_EXTRA_OPTS}
+ && $ENV{PG_TEST_INITDB_EXTRA_OPTS} =~ m/io_method=/)
+ {
+ $ENV{PG_TEST_INITDB_EXTRA_OPTS} .= " -c io_method=$io_method";
+ }
+
+ $node->init(extra => [ '-c', "io_method=$io_method" ]);
+
+ TestAio::configure($node);
+
+ # Even though we used -c io_method=... above, if TEMP_CONFIG sets
+ # io_method, it'd override the setting persisted at initdb time. While
+ # using (and later verifying) the setting from initdb provides some
+ # verification of having used the io_method during initdb, it's probably
+ # not worth the complication of only appending if the variable is set in
+ # in TEMP_CONFIG.
+ $node->append_conf(
+ 'postgresql.conf', qq(
+io_method=$io_method
+));
+
+ ok(1, "$io_method: initdb");
+
+ $node->start();
+ $node->stop();
+ ok(1, "$io_method: start & stop");
+
+ return $node;
+}
diff --git a/src/test/modules/test_aio/t/TestAio.pm b/src/test/modules/test_aio/t/TestAio.pm
new file mode 100644
index 00000000000..5bc80a9b130
--- /dev/null
+++ b/src/test/modules/test_aio/t/TestAio.pm
@@ -0,0 +1,90 @@
+# Copyright (c) 2024-2025, PostgreSQL Global Development Group
+
+=pod
+
+=head1 NAME
+
+TestAio - helpers for writing AIO related tests
+
+=cut
+
+package TestAio;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+
+=pod
+
+=head1 METHODS
+
+=over
+
+=item TestAio::supported_io_methods()
+
+Return an array of all the supported values for the io_method GUC
+
+=cut
+
+sub supported_io_methods()
+{
+ my @io_methods = ('worker');
+
+ push(@io_methods, "io_uring") if have_io_uring();
+
+ # Return sync last, as it will least commonly fail
+ push(@io_methods, "sync");
+
+ return @io_methods;
+}
+
+
+=item TestAio::configure()
+
+Prepare a cluster for AIO test
+
+=cut
+
+sub configure
+{
+ my $node = shift;
+
+ $node->append_conf(
+ 'postgresql.conf', qq(
+shared_preload_libraries=test_aio
+log_min_messages = 'DEBUG3'
+log_statement=all
+log_error_verbosity=default
+restart_after_crash=false
+temp_buffers=100
+));
+
+}
+
+
+=pod
+
+=item TestAio::have_io_uring()
+
+Return if io_uring is supported
+
+=cut
+
+sub have_io_uring
+{
+ # To detect if io_uring is supported, we look at the error message for
+ # assigning an invalid value to an enum GUC, which lists all the valid
+ # options. We need to use -C to deal with running as administrator on
+ # windows, the superuser check is omitted if -C is used.
+ my ($stdout, $stderr) =
+ run_command [qw(postgres -C invalid -c io_method=invalid)];
+ die "can't determine supported io_method values"
+ unless $stderr =~ m/Available values: ([^\.]+)\./;
+ my $methods = $1;
+ note "supported io_method values are: $methods";
+
+ return ($methods =~ m/io_uring/) ? 1 : 0;
+}
+
+1;
--
2.48.1.76.g4e746b1a31.dirty
>From 7598d135a84b6e901645e35be41005a721d23de0 Mon Sep 17 00:00:00 2001
From: Andres Freund <[email protected]>
Date: Wed, 10 Sep 2025 14:00:02 -0400
Subject: [PATCH v2 2/3] test_aio: Add read_stream test infrastructure & tests
Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
src/test/modules/test_aio/meson.build | 1 +
.../modules/test_aio/t/004_read_stream.pl | 282 +++++++++++++++
src/test/modules/test_aio/test_aio--1.0.sql | 26 +-
src/test/modules/test_aio/test_aio.c | 342 +++++++++++++++---
src/tools/pgindent/typedefs.list | 1 +
5 files changed, 601 insertions(+), 51 deletions(-)
create mode 100644 src/test/modules/test_aio/t/004_read_stream.pl
diff --git a/src/test/modules/test_aio/meson.build b/src/test/modules/test_aio/meson.build
index 044149d02b8..d571d9da00d 100644
--- a/src/test/modules/test_aio/meson.build
+++ b/src/test/modules/test_aio/meson.build
@@ -33,6 +33,7 @@ tests += {
't/001_aio.pl',
't/002_io_workers.pl',
't/003_initdb.pl',
+ 't/004_read_stream.pl',
],
},
}
diff --git a/src/test/modules/test_aio/t/004_read_stream.pl b/src/test/modules/test_aio/t/004_read_stream.pl
new file mode 100644
index 00000000000..89cfabbb1d3
--- /dev/null
+++ b/src/test/modules/test_aio/t/004_read_stream.pl
@@ -0,0 +1,282 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+use FindBin;
+use lib $FindBin::RealBin;
+
+use TestAio;
+
+
+my $node = PostgreSQL::Test::Cluster->new('test');
+$node->init();
+
+$node->append_conf(
+ 'postgresql.conf', qq(
+shared_preload_libraries=test_aio
+log_min_messages = 'DEBUG3'
+log_statement=all
+log_error_verbosity=default
+restart_after_crash=false
+temp_buffers=100
+max_connections=8
+io_method=worker
+));
+
+$node->start();
+test_setup($node);
+$node->stop();
+
+
+foreach my $method (TestAio::supported_io_methods())
+{
+ $node->adjust_conf('postgresql.conf', 'io_method', 'worker');
+ $node->start();
+ test_io_method($method, $node);
+ $node->stop();
+}
+
+done_testing();
+
+
+sub test_setup
+{
+ my $node = shift;
+
+ $node->safe_psql(
+ 'postgres', qq(
+CREATE EXTENSION test_aio;
+
+CREATE TABLE largeish(k int not null) WITH (FILLFACTOR=10);
+INSERT INTO largeish(k) SELECT generate_series(1, 10000);
+));
+ ok(1, "setup");
+}
+
+
+sub test_repeated_blocks
+{
+ my $io_method = shift;
+ my $node = shift;
+
+ my $psql = $node->background_psql('postgres', on_error_stop => 0);
+
+ # Preventing larger reads makes testing easier
+ $psql->query_safe(
+ qq/
+SET io_combine_limit = 1;
+/);
+
+ # test miss of the same block twice in a row
+ $psql->query_safe(
+ qq/
+SELECT evict_rel('largeish');
+/);
+ $psql->query_safe(
+ qq/
+SELECT * FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 2, 4, 4]);
+/);
+ ok(1, "$io_method: stream missing the same block repeatedly");
+
+ $psql->query_safe(
+ qq/
+SELECT * FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 2, 4, 4]);
+/);
+ ok(1, "$io_method: stream hitting the same block repeatedly");
+
+ # test hit of the same block twice in a row
+ $psql->query_safe(
+ qq/
+SELECT evict_rel('largeish');
+/);
+ $psql->query_safe(
+ qq/
+SELECT * FROM read_stream_for_blocks('largeish', ARRAY[0, 1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1, 0]);
+/);
+ ok(1, "$io_method: stream accessing same block");
+
+ $psql->quit();
+}
+
+
+sub test_inject_foreign
+{
+ my $io_method = shift;
+ my $node = shift;
+
+ my $psql_a = $node->background_psql('postgres', on_error_stop => 0);
+ my $psql_b = $node->background_psql('postgres', on_error_stop => 0);
+
+ my $pid_a = $psql_a->query_safe(qq/SELECT pg_backend_pid();/);
+
+
+ ###
+ # Test read stream encountering buffers undergoing IO in another backend,
+ # with the other backend's reads succeeding.
+ ###
+ $psql_a->query_safe(
+ qq/
+SELECT evict_rel('largeish');
+/);
+
+ $psql_b->query_safe(
+ qq/
+SELECT inj_io_completion_wait(pid=>pg_backend_pid(), relfilenode=>pg_relation_filenode('largeish'));
+/);
+
+ $psql_b->{stdin} .= qq/
+SELECT read_rel_block_ll('largeish', blockno=>5, nblocks=>1);
+/;
+ $psql_b->{run}->pump_nb();
+
+ $node->poll_query_until(
+ 'postgres', qq/
+SELECT wait_event FROM pg_stat_activity WHERE wait_event = 'completion_wait';
+/,
+ 'completion_wait');
+
+ $psql_a->{stdin} .= qq/
+SELECT array_agg(blocknum) FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 5, 7]);
+/;
+ $psql_a->{run}->pump_nb();
+
+ $node->poll_query_until('postgres',
+ qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a),
+ 'AioIoCompletion');
+
+ $node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/);
+
+ pump_until(
+ $psql_a->{run}, $psql_a->{timeout},
+ \$psql_a->{stdout}, qr/\{0,2,5,7\}/);
+
+ ok(1,
+ qq/$io_method: read stream encounters succeeding IO by another backend/
+ );
+
+
+ ###
+ # Test read stream encountering buffers undergoing IO in another backend,
+ # with the other backend's reads failing.
+ ###
+ $psql_a->query_safe(
+ qq/
+SELECT evict_rel('largeish');
+/);
+
+ $psql_b->query_safe(
+ qq/
+SELECT inj_io_completion_wait(pid=>pg_backend_pid(), relfilenode=>pg_relation_filenode('largeish'));
+/);
+
+ $psql_b->query_safe(
+ qq/
+SELECT inj_io_short_read_attach(-errno_from_string('EIO'), pid=>pg_backend_pid(), relfilenode=>pg_relation_filenode('largeish'));
+/);
+
+ $psql_b->{stdin} .= qq/
+SELECT read_rel_block_ll('largeish', blockno=>5, nblocks=>1);
+/;
+ $psql_b->{run}->pump_nb();
+
+ $node->poll_query_until(
+ 'postgres', qq/
+SELECT wait_event FROM pg_stat_activity WHERE wait_event = 'completion_wait';
+/,
+ 'completion_wait');
+
+ $psql_a->{stdin} .= qq/
+SELECT array_agg(blocknum) FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 5, 7]);
+/;
+ $psql_a->{run}->pump_nb();
+
+ $node->poll_query_until('postgres',
+ qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a),
+ 'AioIoCompletion');
+
+ $node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/);
+
+ pump_until(
+ $psql_a->{run}, $psql_a->{timeout},
+ \$psql_a->{stdout}, qr/\{0,2,5,7\}/);
+
+ $psql_b->{run}->pump_nb();
+ like(
+ $psql_b->{stderr},
+ qr/.*ERROR.*could not read blocks 5..5.*$/,
+ "$io_method: injected error occurred");
+ $psql_b->{stderr} = '';
+ $psql_b->query_safe(qq/SELECT inj_io_short_read_detach();/);
+
+
+ ok(1,
+ qq/$io_method: read stream encounters failing IO by another backend/);
+
+
+ ###
+ # Test read stream encountering two buffers that are undergoing the same
+ # IO, started by another backend
+ ###
+ $psql_a->query_safe(
+ qq/
+SELECT evict_rel('largeish');
+/);
+
+ $psql_b->query_safe(
+ qq/
+SELECT inj_io_completion_wait(pid=>pg_backend_pid(), relfilenode=>pg_relation_filenode('largeish'));
+/);
+
+ $psql_b->{stdin} .= qq/
+SELECT read_rel_block_ll('largeish', blockno=>2, nblocks=>3);
+/;
+ $psql_b->{run}->pump_nb();
+
+ $node->poll_query_until(
+ 'postgres', qq/
+SELECT wait_event FROM pg_stat_activity WHERE wait_event = 'completion_wait';
+/,
+ 'completion_wait');
+
+ $psql_a->{stdin} .= qq/
+SELECT array_agg(blocknum) FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 4]);
+/;
+ $psql_a->{run}->pump_nb();
+
+ $node->poll_query_until('postgres',
+ qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a),
+ 'AioIoCompletion');
+
+ $node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/);
+
+ pump_until(
+ $psql_a->{run}, $psql_a->{timeout},
+ \$psql_a->{stdout}, qr/\{0,2,4\}/);
+
+ ok(1, qq/$io_method: read stream encounters two buffer read in one IO/);
+
+
+ $psql_a->quit();
+ $psql_b->quit();
+}
+
+
+sub test_io_method
+{
+ my $io_method = shift;
+ my $node = shift;
+
+ test_repeated_blocks($io_method, $node);
+
+ SKIP:
+ {
+ skip 'Injection points not supported by this build', 1
+ unless $ENV{enable_injection_points} eq 'yes';
+ test_inject_foreign($io_method, $node);
+ }
+}
diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql
index e495481c41e..da7cc03829a 100644
--- a/src/test/modules/test_aio/test_aio--1.0.sql
+++ b/src/test/modules/test_aio/test_aio--1.0.sql
@@ -33,6 +33,10 @@ CREATE FUNCTION read_rel_block_ll(
RETURNS pg_catalog.void STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
+CREATE FUNCTION evict_rel(rel regclass)
+RETURNS pg_catalog.void STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
CREATE FUNCTION invalidate_rel_block(rel regclass, blockno int)
RETURNS pg_catalog.void STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
@@ -41,7 +45,7 @@ CREATE FUNCTION buffer_create_toy(rel regclass, blockno int4)
RETURNS pg_catalog.int4 STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
-CREATE FUNCTION buffer_call_start_io(buffer int, for_input bool, nowait bool)
+CREATE FUNCTION buffer_call_start_io(buffer int, for_input bool, nowait bool, assign_io bool DEFAULT false)
RETURNS pg_catalog.bool STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
@@ -50,6 +54,14 @@ RETURNS pg_catalog.void STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
+/*
+ * Read stream related functions
+ */
+CREATE FUNCTION read_stream_for_blocks(rel regclass, blocks int4[], OUT blockoff int4, OUT blocknum int4, OUT buf int4)
+RETURNS SETOF record STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
+
/*
* Handle related functions
@@ -91,8 +103,16 @@ AS 'MODULE_PATHNAME' LANGUAGE C;
/*
* Injection point related functions
*/
-CREATE FUNCTION inj_io_short_read_attach(result int)
-RETURNS pg_catalog.void STRICT
+CREATE FUNCTION inj_io_completion_wait(pid int DEFAULT NULL, relfilenode oid DEFAULT 0)
+RETURNS pg_catalog.void
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION inj_io_completion_continue()
+RETURNS pg_catalog.void
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION inj_io_short_read_attach(result int, pid int DEFAULT NULL, relfilenode oid DEFAULT 0)
+RETURNS pg_catalog.void
AS 'MODULE_PATHNAME' LANGUAGE C;
CREATE FUNCTION inj_io_short_read_detach()
diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c
index c55cf6c0aac..1831e535b28 100644
--- a/src/test/modules/test_aio/test_aio.c
+++ b/src/test/modules/test_aio/test_aio.c
@@ -20,16 +20,23 @@
#include "access/relation.h"
#include "fmgr.h"
+#include "funcapi.h"
#include "storage/aio.h"
#include "storage/aio_internal.h"
#include "storage/buf_internals.h"
#include "storage/bufmgr.h"
#include "storage/checksum.h"
+#include "storage/condition_variable.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
+#include "storage/proc.h"
+#include "storage/procnumber.h"
+#include "storage/read_stream.h"
+#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/injection_point.h"
#include "utils/rel.h"
+#include "utils/wait_event.h"
PG_MODULE_MAGIC;
@@ -37,13 +44,30 @@ PG_MODULE_MAGIC;
typedef struct InjIoErrorState
{
+ ConditionVariable cv;
+
bool enabled_short_read;
bool enabled_reopen;
+ bool enabled_completion_wait;
+ Oid completion_wait_relfilenode;
+ pid_t completion_wait_pid;
+ uint32 completion_wait_event;
+
bool short_read_result_set;
+ Oid short_read_relfilenode;
+ pid_t short_read_pid;
int short_read_result;
} InjIoErrorState;
+typedef struct BlocksReadStreamData
+{
+ int nblocks;
+ int curblock;
+ uint32 *blocks;
+} BlocksReadStreamData;
+
+
static InjIoErrorState *inj_io_error_state;
/* Shared memory init callbacks */
@@ -85,10 +109,13 @@ test_aio_shmem_startup(void)
inj_io_error_state->enabled_short_read = false;
inj_io_error_state->enabled_reopen = false;
+ ConditionVariableInit(&inj_io_error_state->cv);
+ inj_io_error_state->completion_wait_event = WaitEventInjectionPointNew("completion_wait");
+
#ifdef USE_INJECTION_POINTS
InjectionPointAttach("aio-process-completion-before-shared",
"test_aio",
- "inj_io_short_read",
+ "inj_io_completion_hook",
NULL,
0);
InjectionPointLoad("aio-process-completion-before-shared");
@@ -378,7 +405,7 @@ read_rel_block_ll(PG_FUNCTION_ARGS)
if (nblocks <= 0 || nblocks > PG_IOV_MAX)
elog(ERROR, "nblocks is out of range");
- rel = relation_open(relid, AccessExclusiveLock);
+ rel = relation_open(relid, AccessShareLock);
for (int i = 0; i < nblocks; i++)
{
@@ -452,6 +479,27 @@ read_rel_block_ll(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
+PG_FUNCTION_INFO_V1(evict_rel);
+Datum
+evict_rel(PG_FUNCTION_ARGS)
+{
+ Oid relid = PG_GETARG_OID(0);
+ Relation rel;
+ int32 buffers_evicted,
+ buffers_flushed,
+ buffers_skipped;
+
+ rel = relation_open(relid, AccessExclusiveLock);
+
+ EvictRelUnpinnedBuffers(rel, &buffers_evicted, &buffers_flushed,
+ &buffers_skipped);
+
+ relation_close(rel, AccessExclusiveLock);
+
+
+ PG_RETURN_VOID();
+}
+
PG_FUNCTION_INFO_V1(invalidate_rel_block);
Datum
invalidate_rel_block(PG_FUNCTION_ARGS)
@@ -604,6 +652,86 @@ buffer_call_terminate_io(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
+
+static BlockNumber
+read_stream_for_blocks_cb(ReadStream *stream,
+ void *callback_private_data,
+ void *per_buffer_data)
+{
+ BlocksReadStreamData *stream_data = callback_private_data;
+
+ if (stream_data->curblock >= stream_data->nblocks)
+ return InvalidBlockNumber;
+ return stream_data->blocks[stream_data->curblock++];
+}
+
+PG_FUNCTION_INFO_V1(read_stream_for_blocks);
+Datum
+read_stream_for_blocks(PG_FUNCTION_ARGS)
+{
+ Oid relid = PG_GETARG_OID(0);
+ ArrayType *blocksarray = PG_GETARG_ARRAYTYPE_P(1);
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ Relation rel;
+ BlocksReadStreamData stream_data;
+ ReadStream *stream;
+
+ InitMaterializedSRF(fcinfo, 0);
+
+ /*
+ * We expect the input to be an N-element int4 array; verify that. We
+ * don't need to use deconstruct_array() since the array data is just
+ * going to look like a C array of N int4 values.
+ */
+ if (ARR_NDIM(blocksarray) != 1 ||
+ ARR_HASNULL(blocksarray) ||
+ ARR_ELEMTYPE(blocksarray) != INT4OID)
+ elog(ERROR, "expected 1 dimensional int4 array");
+
+ stream_data.curblock = 0;
+ stream_data.nblocks = ARR_DIMS(blocksarray)[0];
+ stream_data.blocks = (uint32 *) ARR_DATA_PTR(blocksarray);
+
+ rel = relation_open(relid, AccessShareLock);
+
+ stream = read_stream_begin_relation(READ_STREAM_FULL,
+ NULL,
+ rel,
+ MAIN_FORKNUM,
+ read_stream_for_blocks_cb,
+ &stream_data,
+ 0);
+
+ for (int i = 0; i < stream_data.nblocks; i++)
+ {
+ Buffer buf = read_stream_next_buffer(stream, NULL);
+ Datum values[3] = {0};
+ bool nulls[3] = {0};
+
+ if (!BufferIsValid(buf))
+ elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly invalid", i);
+
+ values[0] = Int32GetDatum(i);
+ values[1] = UInt32GetDatum(stream_data.blocks[i]);
+ values[2] = UInt32GetDatum(buf);
+
+ tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
+
+ ReleaseBuffer(buf);
+ }
+
+ if (read_stream_next_buffer(stream, NULL) != InvalidBuffer)
+ elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly valid",
+ stream_data.nblocks + 1);
+
+ read_stream_end(stream);
+
+ relation_close(rel, NoLock);
+
+ return (Datum) 0;
+}
+
+
PG_FUNCTION_INFO_V1(handle_get);
Datum
handle_get(PG_FUNCTION_ARGS)
@@ -674,15 +802,98 @@ batch_end(PG_FUNCTION_ARGS)
}
#ifdef USE_INJECTION_POINTS
-extern PGDLLEXPORT void inj_io_short_read(const char *name,
- const void *private_data,
- void *arg);
+extern PGDLLEXPORT void inj_io_completion_hook(const char *name,
+ const void *private_data,
+ void *arg);
extern PGDLLEXPORT void inj_io_reopen(const char *name,
const void *private_data,
void *arg);
-void
-inj_io_short_read(const char *name, const void *private_data, void *arg)
+static bool
+inj_io_short_read_matches(PgAioHandle *ioh)
+{
+ PGPROC *owner_proc;
+ int32 owner_pid;
+ PgAioTargetData *td;
+
+ if (!inj_io_error_state->enabled_short_read)
+ return false;
+
+ if (!inj_io_error_state->short_read_result_set)
+ return false;
+
+ owner_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh));
+ owner_pid = owner_proc->pid;
+
+ if (inj_io_error_state->short_read_pid != 0 &&
+ inj_io_error_state->short_read_pid != owner_pid)
+ return false;
+
+ td = pgaio_io_get_target_data(ioh);
+
+ if (inj_io_error_state->short_read_relfilenode != InvalidOid &&
+ td->smgr.rlocator.relNumber != inj_io_error_state->short_read_relfilenode)
+ return false;
+
+ /*
+ * Only shorten reads that are actually longer than the target size,
+ * otherwise we can trigger over-reads.
+ */
+ if (inj_io_error_state->short_read_result >= ioh->result)
+ return false;
+
+ return true;
+}
+
+static bool
+inj_io_completion_wait_matches(PgAioHandle *ioh)
+{
+ PGPROC *owner_proc;
+ int32 owner_pid;
+ PgAioTargetData *td;
+
+ if (!inj_io_error_state->enabled_completion_wait)
+ return false;
+
+ owner_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh));
+ owner_pid = owner_proc->pid;
+
+ if (inj_io_error_state->completion_wait_pid != owner_pid)
+ return false;
+
+ td = pgaio_io_get_target_data(ioh);
+
+ if (inj_io_error_state->completion_wait_relfilenode != InvalidOid &&
+ td->smgr.rlocator.relNumber != inj_io_error_state->completion_wait_relfilenode)
+ return false;
+
+ return true;
+}
+
+static void
+inj_io_completion_wait_hook(const char *name, const void *private_data, void *arg)
+{
+ PgAioHandle *ioh = (PgAioHandle *) arg;
+
+ if (!inj_io_completion_wait_matches(ioh))
+ return;
+
+ ConditionVariablePrepareToSleep(&inj_io_error_state->cv);
+
+ while (true)
+ {
+ if (!inj_io_completion_wait_matches(ioh))
+ break;
+
+ ConditionVariableSleep(&inj_io_error_state->cv,
+ inj_io_error_state->completion_wait_event);
+ }
+
+ ConditionVariableCancelSleep();
+}
+
+static void
+inj_io_short_read_hook(const char *name, const void *private_data, void *arg)
{
PgAioHandle *ioh = (PgAioHandle *) arg;
@@ -691,58 +902,56 @@ inj_io_short_read(const char *name, const void *private_data, void *arg)
inj_io_error_state->enabled_reopen),
errhidestmt(true), errhidecontext(true));
- if (inj_io_error_state->enabled_short_read)
+ if (inj_io_short_read_matches(ioh))
{
+ struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off];
+ int32 old_result = ioh->result;
+ int32 new_result = inj_io_error_state->short_read_result;
+ int32 processed = 0;
+
+ ereport(LOG,
+ errmsg("short read inject point, changing result from %d to %d",
+ old_result, new_result),
+ errhidestmt(true), errhidecontext(true));
+
/*
- * Only shorten reads that are actually longer than the target size,
- * otherwise we can trigger over-reads.
+ * The underlying IO actually completed OK, and thus the "invalid"
+ * portion of the IOV actually contains valid data. That can hide a
+ * lot of problems, e.g. if we were to wrongly mark a buffer, that
+ * wasn't read according to the shortened-read, IO as valid, the
+ * contents would look valid and we might miss a bug.
+ *
+ * To avoid that, iterate through the IOV and zero out the "failed"
+ * portion of the IO.
*/
- if (inj_io_error_state->short_read_result_set
- && ioh->op == PGAIO_OP_READV
- && inj_io_error_state->short_read_result <= ioh->result)
+ for (int i = 0; i < ioh->op_data.read.iov_length; i++)
{
- struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off];
- int32 old_result = ioh->result;
- int32 new_result = inj_io_error_state->short_read_result;
- int32 processed = 0;
-
- ereport(LOG,
- errmsg("short read inject point, changing result from %d to %d",
- old_result, new_result),
- errhidestmt(true), errhidecontext(true));
-
- /*
- * The underlying IO actually completed OK, and thus the "invalid"
- * portion of the IOV actually contains valid data. That can hide
- * a lot of problems, e.g. if we were to wrongly mark a buffer,
- * that wasn't read according to the shortened-read, IO as valid,
- * the contents would look valid and we might miss a bug.
- *
- * To avoid that, iterate through the IOV and zero out the
- * "failed" portion of the IO.
- */
- for (int i = 0; i < ioh->op_data.read.iov_length; i++)
+ if (processed + iov[i].iov_len <= new_result)
+ processed += iov[i].iov_len;
+ else if (processed <= new_result)
{
- if (processed + iov[i].iov_len <= new_result)
- processed += iov[i].iov_len;
- else if (processed <= new_result)
- {
- uint32 ok_part = new_result - processed;
+ uint32 ok_part = new_result - processed;
- memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part);
- processed += iov[i].iov_len;
- }
- else
- {
- memset((char *) iov[i].iov_base, 0, iov[i].iov_len);
- }
+ memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part);
+ processed += iov[i].iov_len;
+ }
+ else
+ {
+ memset((char *) iov[i].iov_base, 0, iov[i].iov_len);
}
-
- ioh->result = new_result;
}
+
+ ioh->result = new_result;
}
}
+void
+inj_io_completion_hook(const char *name, const void *private_data, void *arg)
+{
+ inj_io_completion_wait_hook(name, private_data, arg);
+ inj_io_short_read_hook(name, private_data, arg);
+}
+
void
inj_io_reopen(const char *name, const void *private_data, void *arg)
{
@@ -756,6 +965,39 @@ inj_io_reopen(const char *name, const void *private_data, void *arg)
}
#endif
+PG_FUNCTION_INFO_V1(inj_io_completion_wait);
+Datum
+inj_io_completion_wait(PG_FUNCTION_ARGS)
+{
+#ifdef USE_INJECTION_POINTS
+ inj_io_error_state->enabled_completion_wait = true;
+ inj_io_error_state->completion_wait_pid =
+ PG_ARGISNULL(0) ? 0 : PG_GETARG_INT32(0);
+ inj_io_error_state->completion_wait_relfilenode =
+ PG_ARGISNULL(1) ? InvalidOid : PG_GETARG_OID(1);
+#else
+ elog(ERROR, "injection points not supported");
+#endif
+
+ PG_RETURN_VOID();
+}
+
+PG_FUNCTION_INFO_V1(inj_io_completion_continue);
+Datum
+inj_io_completion_continue(PG_FUNCTION_ARGS)
+{
+#ifdef USE_INJECTION_POINTS
+ inj_io_error_state->enabled_completion_wait = false;
+ inj_io_error_state->completion_wait_pid = 0;
+ inj_io_error_state->completion_wait_relfilenode = InvalidOid;
+ ConditionVariableBroadcast(&inj_io_error_state->cv);
+#else
+ elog(ERROR, "injection points not supported");
+#endif
+
+ PG_RETURN_VOID();
+}
+
PG_FUNCTION_INFO_V1(inj_io_short_read_attach);
Datum
inj_io_short_read_attach(PG_FUNCTION_ARGS)
@@ -765,6 +1007,10 @@ inj_io_short_read_attach(PG_FUNCTION_ARGS)
inj_io_error_state->short_read_result_set = !PG_ARGISNULL(0);
if (inj_io_error_state->short_read_result_set)
inj_io_error_state->short_read_result = PG_GETARG_INT32(0);
+ inj_io_error_state->short_read_pid =
+ PG_ARGISNULL(1) ? 0 : PG_GETARG_INT32(1);
+ inj_io_error_state->short_read_relfilenode =
+ PG_ARGISNULL(2) ? 0 : PG_GETARG_OID(2);
#else
elog(ERROR, "injection points not supported");
#endif
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index a13e8162890..7c99bec3e2e 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -293,6 +293,7 @@ BlockSampler
BlockSamplerData
BlockedProcData
BlockedProcsData
+BlocksReadStreamData
BlocktableEntry
BloomBuildState
BloomFilter
--
2.48.1.76.g4e746b1a31.dirty
>From 917c654520a6d34c0da391ef74bf430cc2e0102f Mon Sep 17 00:00:00 2001
From: Andres Freund <[email protected]>
Date: Fri, 15 Aug 2025 11:01:52 -0400
Subject: [PATCH v2 3/3] bufmgr: aio: Prototype for not waiting for
already-in-progress IO
Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
src/include/storage/bufmgr.h | 1 +
src/backend/storage/buffer/bufmgr.c | 150 ++++++++++++++++++++++++++--
2 files changed, 142 insertions(+), 9 deletions(-)
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 41fdc1e7693..7ddb867bc99 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -137,6 +137,7 @@ struct ReadBuffersOperation
int flags;
int16 nblocks;
int16 nblocks_done;
+ bool foreign_io;
PgAioWaitRef io_wref;
PgAioReturn io_return;
};
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index fe470de63f2..7b245daaf4e 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1557,6 +1557,46 @@ ReadBuffersCanStartIOOnce(Buffer buffer, bool nowait)
return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait);
}
+/*
+ * Check if the buffer is already undergoing read AIO. If it is, assign the
+ * IO's wait reference to operation->io_wref, thereby allowing the caller to
+ * wait for that IO.
+ */
+static inline bool
+ReadBuffersIOAlreadyInProgress(ReadBuffersOperation *operation, Buffer buffer)
+{
+ BufferDesc *desc;
+ uint32 buf_state;
+ PgAioWaitRef iow;
+
+ pgaio_wref_clear(&iow);
+
+ if (BufferIsLocal(buffer))
+ {
+ desc = GetLocalBufferDescriptor(-buffer - 1);
+ buf_state = pg_atomic_read_u32(&desc->state);
+ if ((buf_state & BM_IO_IN_PROGRESS) && !(buf_state & BM_VALID))
+ iow = desc->io_wref;
+ }
+ else
+ {
+ desc = GetBufferDescriptor(buffer - 1);
+ buf_state = LockBufHdr(desc);
+
+ if ((buf_state & BM_IO_IN_PROGRESS) && !(buf_state & BM_VALID))
+ iow = desc->io_wref;
+ UnlockBufHdr(desc, buf_state);
+ }
+
+ if (pgaio_wref_valid(&iow))
+ {
+ operation->io_wref = iow;
+ return true;
+ }
+
+ return false;
+}
+
/*
* Helper for AsyncReadBuffers that tries to get the buffer ready for IO.
*/
@@ -1689,7 +1729,7 @@ WaitReadBuffers(ReadBuffersOperation *operation)
*
* we first check if we already know the IO is complete.
*/
- if (aio_ret->result.status == PGAIO_RS_UNKNOWN &&
+ if ((operation->foreign_io || aio_ret->result.status == PGAIO_RS_UNKNOWN) &&
!pgaio_wref_check_done(&operation->io_wref))
{
instr_time io_start = pgstat_prepare_io_time(track_io_timing);
@@ -1708,11 +1748,66 @@ WaitReadBuffers(ReadBuffersOperation *operation)
Assert(pgaio_wref_check_done(&operation->io_wref));
}
- /*
- * We now are sure the IO completed. Check the results. This
- * includes reporting on errors if there were any.
- */
- ProcessReadBuffersResult(operation);
+ if (unlikely(operation->foreign_io))
+ {
+ Buffer buffer = operation->buffers[operation->nblocks_done];
+ BufferDesc *desc;
+ uint32 buf_state;
+
+ if (BufferIsLocal(buffer))
+ {
+ desc = GetLocalBufferDescriptor(-buffer - 1);
+ buf_state = pg_atomic_read_u32(&desc->state);
+ }
+ else
+ {
+ desc = GetBufferDescriptor(buffer - 1);
+ buf_state = LockBufHdr(desc);
+ UnlockBufHdr(desc, buf_state);
+ }
+
+ if (buf_state & BM_VALID)
+ {
+ operation->nblocks_done += 1;
+ Assert(operation->nblocks_done <= operation->nblocks);
+
+ /*
+ * Report and track this as a 'hit' for this backend, even
+ * though it must have started out as a miss in
+ * PinBufferForBlock(). The other backend (or ourselves,
+ * as part of a read started earlier) will track this as a
+ * 'read'.
+ */
+ TRACE_POSTGRESQL_BUFFER_READ_DONE(operation->forknum,
+ operation->blocknum + operation->nblocks_done,
+ operation->smgr->smgr_rlocator.locator.spcOid,
+ operation->smgr->smgr_rlocator.locator.dbOid,
+ operation->smgr->smgr_rlocator.locator.relNumber,
+ operation->smgr->smgr_rlocator.backend,
+ true);
+
+ if (BufferIsLocal(buffer))
+ pgBufferUsage.local_blks_hit += 1;
+ else
+ pgBufferUsage.shared_blks_hit += 1;
+
+ if (operation->rel)
+ pgstat_count_buffer_hit(operation->rel);
+
+ pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0);
+
+ if (VacuumCostActive)
+ VacuumCostBalance += VacuumCostPageHit;
+ }
+ }
+ else
+ {
+ /*
+ * We now are sure the IO completed. Check the results. This
+ * includes reporting on errors if there were any.
+ */
+ ProcessReadBuffersResult(operation);
+ }
}
/*
@@ -1798,6 +1893,43 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
io_object = IOOBJECT_RELATION;
}
+ /*
+ * If AIO is in progress, be it in this backend or another backend, we
+ * just associate the wait reference with the operation and wait in
+ * WaitReadBuffers(). This turns out to be important for performance in
+ * two workloads:
+ *
+ * 1) A read stream that has to read the same block multiple times within
+ * the readahead distance. This can happen e.g. for the table accesses of
+ * an index scan.
+ *
+ * 2) Concurrent scans by multiple backends on the same relation.
+ *
+ * If we were to synchronously wait for the in-progress IO, we'd not be
+ * able to keep enough I/O in flight.
+ *
+ * If we do find there is ongoing I/O for the buffer, we set up a 1-block
+ * ReadBuffersOperation that WaitReadBuffers then can wait on.
+ *
+ * It's possible that another backend starts IO on the buffer between this
+ * check and the ReadBuffersCanStartIO(nowait = false) below. In that case
+ * we will synchronously wait for the IO below, but the window for that is
+ * small enough that it won't happen often enough to have a significant
+ * performance impact.
+ */
+ if (ReadBuffersIOAlreadyInProgress(operation, buffers[nblocks_done]))
+ {
+ *nblocks_progress = 1;
+ operation->foreign_io = true;
+
+ CheckReadBuffersOperation(operation, false);
+
+
+ return true;
+ }
+
+ operation->foreign_io = false;
+
/*
* If zero_damaged_pages is enabled, add the READ_BUFFERS_ZERO_ON_ERROR
* flag. The reason for that is that, hopefully, zero_damaged_pages isn't
@@ -1855,9 +1987,9 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
/*
* Check if we can start IO on the first to-be-read buffer.
*
- * If an I/O is already in progress in another backend, we want to wait
- * for the outcome: either done, or something went wrong and we will
- * retry.
+ * If a synchronous I/O is in progress in another backend (it can't be
+ * this backend), we want to wait for the outcome: either done, or
+ * something went wrong and we will retry.
*/
if (!ReadBuffersCanStartIO(buffers[nblocks_done], false))
{
--
2.48.1.76.g4e746b1a31.dirty