Here's a really simple way to see the new unfairness at the end of a parallel scan:
drop table if exists t; create table t (i int); insert into t select generate_series(1, 100000); alter table t set (parallel_workers = 2); set parallel_setup_cost = 0; set parallel_leader_participation = off; explain (analyze, buffers, verbose) select count(*) from t; On my machine, unpatched master shows: Worker 0: actual time=0.036..12.452 rows=51076 loops=1 Buffers: shared hit=226 Worker 1: actual time=0.037..12.003 rows=48924 loops=1 Buffers: shared hit=217 The attached patch, which I'd like to push, is effectively what Alexander tested (blocknums[16] -> blocknums[1]). There's no point in using an array of size 1, so I've turned it into a simple variable and deleted the relevant comments. My machine shows: Worker 0: actual time=0.038..12.115 rows=49946 loops=1 Buffers: shared hit=221 Worker 1: actual time=0.038..12.109 rows=50054 loops=1 Buffers: shared hit=222 That difference may not seem huge, but other pre-existing things are going pathologically wrong in the reported query that magnify it (see my earlier analysis). It's an interesting problem that will require more study (my earlier analysis missed a detail that I'll write about separately), but it doesn't seem to be new or have easy fixes, so that will have to be for later work.
From 07ff31ad30bf9e383e42336e28143852e3793c5b Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Tue, 27 Aug 2024 15:11:53 +1200 Subject: [PATCH] Fix unfairness in all-cached parallel seq scan. Commit b5a9b18c introduced block streaming infrastructure with a special fast path for all-cached blocks, and commit b7b0f3f2 connected the infrastructure up to sequential scans. One of the fast path optimizations had an unintended consequence: it interfered with the underlying parallel sequential scan's block allocator, which has its own ramp-up and ramp-down algorithm. A scan of a small all-cached table could give more blocks to one worker. In some plans (probably already very bad plans, such as the one reported by Alexander), the unfairness could be magnified. Now all-cached scans will call the next-block-number callback just once each time it wants a new block, instead of trying to buffer 16 block numbers at once. Back-patch to 17. Reported-by: Alexander Lakhin <exclus...@gmail.com> Discussion: https://postgr.es/m/63a63690-dd92-c809-0b47-af05459e95d1%40gmail.com --- src/backend/storage/aio/read_stream.c | 82 ++++++++------------------- 1 file changed, 24 insertions(+), 58 deletions(-) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index a83c18c2a4b..57d9e93c001 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -117,13 +117,10 @@ struct ReadStream bool advice_enabled; /* - * Small buffer of block numbers, useful for 'ungetting' to resolve flow - * control problems when I/Os are split. Also useful for batch-loading - * block numbers in the fast path. + * One-block buffer to support 'ungetting' a block number, to resolve flow + * control problems when I/Os are split. */ - BlockNumber blocknums[16]; - int16 blocknums_count; - int16 blocknums_next; + BlockNumber buffered_blocknum; /* * The callback that will tell us which block numbers to read, and an @@ -167,68 +164,39 @@ get_per_buffer_data(ReadStream *stream, int16 buffer_index) } /* - * Ask the callback which block it would like us to read next, with a small - * buffer in front to allow read_stream_unget_block() to work and to allow the - * fast path to skip this function and work directly from the array. + * Ask the callback which block it would like us to read next, with a one-block + * buffer in front to allow read_stream_unget_block() to work. */ static inline BlockNumber read_stream_get_block(ReadStream *stream, void *per_buffer_data) { - if (stream->blocknums_next < stream->blocknums_count) - return stream->blocknums[stream->blocknums_next++]; + BlockNumber blocknum; - /* - * We only bother to fetch one at a time here (but see the fast path which - * uses more). - */ - return stream->callback(stream, - stream->callback_private_data, - per_buffer_data); + blocknum = stream->buffered_blocknum; + if (blocknum != InvalidBlockNumber) + stream->buffered_blocknum = InvalidBlockNumber; + else + blocknum = stream->callback(stream, + stream->callback_private_data, + per_buffer_data); + + return blocknum; } /* * In order to deal with short reads in StartReadBuffers(), we sometimes need - * to defer handling of a block until later. + * to defer handling of a block number we've already received from the callback + * until later. */ static inline void read_stream_unget_block(ReadStream *stream, BlockNumber blocknum) { - if (stream->blocknums_next == stream->blocknums_count) - { - /* Never initialized or entirely consumed. Re-initialize. */ - stream->blocknums[0] = blocknum; - stream->blocknums_count = 1; - stream->blocknums_next = 0; - } - else - { - /* Must be the last value return from blocknums array. */ - Assert(stream->blocknums_next > 0); - stream->blocknums_next--; - Assert(stream->blocknums[stream->blocknums_next] == blocknum); - } + /* We shouldn't ever unget more than one block. */ + Assert(stream->buffered_blocknum == InvalidBlockNumber); + Assert(blocknum != InvalidBlockNumber); + stream->buffered_blocknum = blocknum; } -#ifndef READ_STREAM_DISABLE_FAST_PATH -static void -read_stream_fill_blocknums(ReadStream *stream) -{ - BlockNumber blocknum; - int i = 0; - - do - { - blocknum = stream->callback(stream, - stream->callback_private_data, - NULL); - stream->blocknums[i++] = blocknum; - } while (i < lengthof(stream->blocknums) && - blocknum != InvalidBlockNumber); - stream->blocknums_count = i; - stream->blocknums_next = 0; -} -#endif - static void read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) { @@ -530,6 +498,7 @@ read_stream_begin_impl(int flags, stream->queue_size = queue_size; stream->callback = callback; stream->callback_private_data = callback_private_data; + stream->buffered_blocknum = InvalidBlockNumber; /* * Skip the initial ramp-up phase if the caller says we're going to be @@ -649,9 +618,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) Assert(buffer != InvalidBuffer); /* Choose the next block to pin. */ - if (unlikely(stream->blocknums_next == stream->blocknums_count)) - read_stream_fill_blocknums(stream); - next_blocknum = stream->blocknums[stream->blocknums_next++]; + next_blocknum = read_stream_get_block(stream, NULL); if (likely(next_blocknum != InvalidBlockNumber)) { @@ -828,8 +795,7 @@ read_stream_reset(ReadStream *stream) stream->distance = 0; /* Forget buffered block numbers and fast path state. */ - stream->blocknums_next = 0; - stream->blocknums_count = 0; + stream->buffered_blocknum = InvalidBlockNumber; stream->fast_path = false; /* Unpin anything that wasn't consumed. */ -- 2.46.0