On Fri, Mar 18, 2022 at 9:59 AM Thomas Munro <thomas.mu...@gmail.com> wrote: > I'll push 0001 today to let the build farm chew on it for a few days > before moving to 0002.
Clearly 018_wal_optimize.pl is flapping and causing recoveryCheck to fail occasionally, but that predates the above commit. I didn't follow the existing discussion on that, so I'll try to look into that tomorrow. Here's a rebase of the 0002 patch, now called 0001
From 3ac04122e635b98c50d6e48677fe74535d631388 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Sun, 20 Mar 2022 16:56:12 +1300 Subject: [PATCH v24] Prefetch referenced data in recovery, take II. Introduce a new GUC recovery_prefetch, disabled by default. When enabled, look ahead in the WAL and try to initiate asynchronous reading of referenced data blocks that are not yet cached in our buffer pool. For now, this is done with posix_fadvise(), which has several caveats. Since not all OSes have that system call, "try" is provided so that it can be enabled on operating systems where it is available, and that is used in 027_stream_regress.pl so that we effectively exercise on and off behaviors in the build farm. Better mechanisms will follow in later work on the I/O subsystem. The GUC maintenance_io_concurrency is used to limit the number of concurrent I/Os we allow ourselves to initiate, based on pessimistic heuristics used to infer that I/Os have begun and completed. The GUC wal_decode_buffer_size limits the maximum distance we are prepared to read ahead in the WAL to find uncached blocks. Reviewed-by: Tomas Vondra <tomas.von...@2ndquadrant.com> Reviewed-by: Alvaro Herrera <alvhe...@2ndquadrant.com> (earlier version) Reviewed-by: Andres Freund <and...@anarazel.de> (earlier version) Reviewed-by: Justin Pryzby <pry...@telsasoft.com> (earlier version) Tested-by: Tomas Vondra <tomas.von...@2ndquadrant.com> (earlier version) Tested-by: Jakub Wartak <jakub.war...@tomtom.com> (earlier version) Tested-by: Dmitry Dolgov <9erthali...@gmail.com> (earlier version) Tested-by: Sait Talha Nisanci <sait.nisa...@microsoft.com> (earlier version) Discussion: https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com --- doc/src/sgml/config.sgml | 64 ++ doc/src/sgml/monitoring.sgml | 77 +- doc/src/sgml/wal.sgml | 12 + src/backend/access/transam/Makefile | 1 + src/backend/access/transam/xlog.c | 2 + src/backend/access/transam/xlogprefetcher.c | 968 ++++++++++++++++++ src/backend/access/transam/xlogreader.c | 13 + src/backend/access/transam/xlogrecovery.c | 160 ++- src/backend/access/transam/xlogutils.c | 27 +- src/backend/catalog/system_views.sql | 13 + src/backend/storage/freespace/freespace.c | 3 +- src/backend/storage/ipc/ipci.c | 3 + src/backend/utils/misc/guc.c | 53 +- src/backend/utils/misc/postgresql.conf.sample | 5 + src/include/access/xlog.h | 1 + src/include/access/xlogprefetcher.h | 51 + src/include/access/xlogreader.h | 8 + src/include/access/xlogutils.h | 3 +- src/include/catalog/pg_proc.dat | 8 + src/include/utils/guc.h | 4 + src/test/recovery/t/027_stream_regress.pl | 3 + src/test/regress/expected/rules.out | 10 + src/tools/pgindent/typedefs.list | 7 + 23 files changed, 1434 insertions(+), 62 deletions(-) create mode 100644 src/backend/access/transam/xlogprefetcher.c create mode 100644 src/include/access/xlogprefetcher.h diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 7a48973b3c..ce84f379a8 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -3644,6 +3644,70 @@ include_dir 'conf.d' </variablelist> </sect2> + <sect2 id="runtime-config-wal-recovery"> + + <title>Recovery</title> + + <indexterm> + <primary>configuration</primary> + <secondary>of recovery</secondary> + <tertiary>general settings</tertiary> + </indexterm> + + <para> + This section describes the settings that apply to recovery in general, + affecting crash recovery, streaming replication and archive-based + replication. + </para> + + + <variablelist> + <varlistentry id="guc-recovery-prefetch" xreflabel="recovery_prefetch"> + <term><varname>recovery_prefetch</varname> (<type>enum</type>) + <indexterm> + <primary><varname>recovery_prefetch</varname> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Whether to try to prefetch blocks that are referenced in the WAL that + are not yet in the buffer pool, during recovery. Valid values are + <literal>off</literal> (the default), <literal>on</literal> and + <literal>try</literal>. The setting <literal>try</literal> enables + prefetching only if the operating system provides the + <function>posix_fadvise</function> function, which is currently used + to implement prefetching. Note that some operating systems provide the + function, but don't actually perform any prefetching. + </para> + <para> + Prefetching blocks that will soon be needed can reduce I/O wait times + during recovery with some workloads. + See also the <xref linkend="guc-wal-decode-buffer-size"/> and + <xref linkend="guc-maintenance-io-concurrency"/> settings, which limit + prefetching activity. + </para> + </listitem> + </varlistentry> + + <varlistentry id="guc-wal-decode-buffer-size" xreflabel="wal_decode_buffer_size"> + <term><varname>wal_decode_buffer_size</varname> (<type>integer</type>) + <indexterm> + <primary><varname>wal_decode_buffer_size</varname> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + A limit on how far ahead the server can look in the WAL, to find + blocks to prefetch. If this value is specified without units, it is + taken as bytes. + The default is 512kB. + </para> + </listitem> + </varlistentry> + + </variablelist> + </sect2> + <sect2 id="runtime-config-wal-archive-recovery"> <title>Archive Recovery</title> diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 35b2923c5e..b78081e6d7 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -328,6 +328,13 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser </entry> </row> + <row> + <entry><structname>pg_stat_prefetch_recovery</structname><indexterm><primary>pg_stat_prefetch_recovery</primary></indexterm></entry> + <entry>Only one row, showing statistics about blocks prefetched during recovery. + See <xref linkend="pg-stat-prefetch-recovery-view"/> for details. + </entry> + </row> + <row> <entry><structname>pg_stat_subscription</structname><indexterm><primary>pg_stat_subscription</primary></indexterm></entry> <entry>At least one row per subscription, showing information about @@ -2967,6 +2974,69 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i copy of the subscribed tables. </para> + <table id="pg-stat-prefetch-recovery-view" xreflabel="pg_stat_prefetch_recovery"> + <title><structname>pg_stat_prefetch_recovery</structname> View</title> + <tgroup cols="3"> + <thead> + <row> + <entry>Column</entry> + <entry>Type</entry> + <entry>Description</entry> + </row> + </thead> + + <tbody> + <row> + <entry><structfield>prefetch</structfield></entry> + <entry><type>bigint</type></entry> + <entry>Number of blocks prefetched because they were not in the buffer pool</entry> + </row> + <row> + <entry><structfield>hit</structfield></entry> + <entry><type>bigint</type></entry> + <entry>Number of blocks not prefetched because they were already in the buffer pool</entry> + </row> + <row> + <entry><structfield>skip_init</structfield></entry> + <entry><type>bigint</type></entry> + <entry>Number of blocks not prefetched because they would be zero-initialized</entry> + </row> + <row> + <entry><structfield>skip_new</structfield></entry> + <entry><type>bigint</type></entry> + <entry>Number of blocks not prefetched because they didn't exist yet</entry> + </row> + <row> + <entry><structfield>skip_fpw</structfield></entry> + <entry><type>bigint</type></entry> + <entry>Number of blocks not prefetched because a full page image was included in the WAL</entry> + </row> + <row> + <entry><structfield>wal_distance</structfield></entry> + <entry><type>integer</type></entry> + <entry>How many bytes ahead the prefetcher is looking</entry> + </row> + <row> + <entry><structfield>block_distance</structfield></entry> + <entry><type>integer</type></entry> + <entry>How many blocks ahead the prefetcher is looking</entry> + </row> + <row> + <entry><structfield>io_depth</structfield></entry> + <entry><type>integer</type></entry> + <entry>How many prefetches have been initiated but are not yet known to have completed</entry> + </row> + </tbody> + </tgroup> + </table> + + <para> + The <structname>pg_stat_prefetch_recovery</structname> view will contain only + one row. It is filled with nulls if recovery is not running or WAL + prefetching is not enabled. See <xref linkend="guc-recovery-prefetch"/> + for more information. + </para> + <table id="pg-stat-subscription" xreflabel="pg_stat_subscription"> <title><structname>pg_stat_subscription</structname> View</title> <tgroup cols="1"> @@ -5186,8 +5256,11 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i all the counters shown in the <structname>pg_stat_bgwriter</structname> view, <literal>archiver</literal> to reset all the counters shown in - the <structname>pg_stat_archiver</structname> view or <literal>wal</literal> - to reset all the counters shown in the <structname>pg_stat_wal</structname> view. + the <structname>pg_stat_archiver</structname> view, + <literal>wal</literal> to reset all the counters shown in the + <structname>pg_stat_wal</structname> view or + <literal>prefetch_recovery</literal> to reset all the counters shown + in the <structname>pg_stat_prefetch_recovery</structname> view. </para> <para> This function is restricted to superusers by default, but other users diff --git a/doc/src/sgml/wal.sgml b/doc/src/sgml/wal.sgml index 2bb27a8468..8566f297d3 100644 --- a/doc/src/sgml/wal.sgml +++ b/doc/src/sgml/wal.sgml @@ -803,6 +803,18 @@ counted as <literal>wal_write</literal> and <literal>wal_sync</literal> in <structname>pg_stat_wal</structname>, respectively. </para> + + <para> + The <xref linkend="guc-recovery-prefetch"/> parameter can + be used to improve I/O performance during recovery by instructing + <productname>PostgreSQL</productname> to initiate reads + of disk blocks that will soon be needed but are not currently in + <productname>PostgreSQL</productname>'s buffer pool. + The <xref linkend="guc-maintenance-io-concurrency"/> and + <xref linkend="guc-wal-decode-buffer-size"/> settings limit prefetching + concurrency and distance, respectively. + By default, prefetching in recovery is disabled. + </para> </sect1> <sect1 id="wal-internals"> diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile index 79314c69ab..8c17c88dfc 100644 --- a/src/backend/access/transam/Makefile +++ b/src/backend/access/transam/Makefile @@ -31,6 +31,7 @@ OBJS = \ xlogarchive.o \ xlogfuncs.o \ xloginsert.o \ + xlogprefetcher.o \ xlogreader.o \ xlogrecovery.o \ xlogutils.o diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 4ac3871c74..a1544c052e 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -59,6 +59,7 @@ #include "access/xlog_internal.h" #include "access/xlogarchive.h" #include "access/xloginsert.h" +#include "access/xlogprefetcher.h" #include "access/xlogreader.h" #include "access/xlogrecovery.h" #include "access/xlogutils.h" @@ -133,6 +134,7 @@ int CommitDelay = 0; /* precommit delay in microseconds */ int CommitSiblings = 5; /* # concurrent xacts needed to sleep */ int wal_retrieve_retry_interval = 5000; int max_slot_wal_keep_size_mb = -1; +int wal_decode_buffer_size = 512 * 1024; bool track_wal_io_timing = false; #ifdef WAL_DEBUG diff --git a/src/backend/access/transam/xlogprefetcher.c b/src/backend/access/transam/xlogprefetcher.c new file mode 100644 index 0000000000..537b0b192a --- /dev/null +++ b/src/backend/access/transam/xlogprefetcher.c @@ -0,0 +1,968 @@ +/*------------------------------------------------------------------------- + * + * xlogprefetcher.c + * Prefetching support for recovery. + * + * Portions Copyright (c) 2022, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/access/transam/xlogprefetcher.c + * + * This module provides a drop-in replacement for an XLogReader that tries to + * minimize I/O stalls by looking up future blocks in the buffer cache, and + * initiating I/Os that might complete before the caller eventually needs the + * data. XLogRecBufferForRedo() cooperates uses information stored in the + * decoded record to find buffers efficiently. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/xlog.h" +#include "access/xlogprefetcher.h" +#include "access/xlogreader.h" +#include "access/xlogutils.h" +#include "catalog/pg_class.h" +#include "catalog/pg_control.h" +#include "catalog/storage_xlog.h" +#include "commands/dbcommands_xlog.h" +#include "utils/fmgrprotos.h" +#include "utils/timestamp.h" +#include "funcapi.h" +#include "pgstat.h" +#include "miscadmin.h" +#include "port/atomics.h" +#include "storage/bufmgr.h" +#include "storage/shmem.h" +#include "storage/smgr.h" +#include "utils/guc.h" +#include "utils/hsearch.h" + +/* Every time we process this much WAL, we update dynamic values in shm. */ +#define XLOGPREFETCHER_STATS_SHM_DISTANCE BLCKSZ + +/* GUCs */ +int recovery_prefetch = RECOVERY_PREFETCH_OFF; + +#ifdef USE_PREFETCH +#define RecoveryPrefetchEnabled() (recovery_prefetch != RECOVERY_PREFETCH_OFF) +#else +#define RecoveryPrefetchEnabled() false +#endif + +static int XLogPrefetchReconfigureCount = 0; + +/* + * Enum used to report whether an IO should be started. + */ +typedef enum +{ + LRQ_NEXT_NO_IO, + LRQ_NEXT_IO, + LRQ_NEXT_AGAIN +} LsnReadQueueNextStatus; + +/* + * Type of callback that can decide which block to prefetch next. For now + * there is only one. + */ +typedef LsnReadQueueNextStatus (*LsnReadQueueNextFun) (uintptr_t lrq_private, + XLogRecPtr *lsn); + +/* + * A simple circular queue of LSNs, using to control the number of + * (potentially) inflight IOs. This stands in for a later more general IO + * control mechanism, which is why it has the apparently unnecessary + * indirection through a function pointer. + */ +typedef struct LsnReadQueue +{ + LsnReadQueueNextFun next; + uintptr_t lrq_private; + uint32 max_inflight; + uint32 inflight; + uint32 completed; + uint32 head; + uint32 tail; + uint32 size; + struct + { + bool io; + XLogRecPtr lsn; + } queue[FLEXIBLE_ARRAY_MEMBER]; +} LsnReadQueue; + +/* + * A prefetcher. This is a mechanism that wraps an XLogReader, prefetching + * blocks that will be soon be referenced, to try to avoid IO stalls. + */ +struct XLogPrefetcher +{ + /* WAL reader and current reading state. */ + XLogReaderState *reader; + DecodedXLogRecord *record; + int next_block_id; + + /* When to publish stats. */ + XLogRecPtr next_stats_shm_lsn; + + /* Book-keeping required to avoid accessing non-existing blocks. */ + HTAB *filter_table; + dlist_head filter_queue; + + /* Book-keeping for readahead barriers. */ + XLogRecPtr no_readahead_until; + + /* IO depth manager. */ + LsnReadQueue *streaming_read; + + XLogRecPtr begin_ptr; + + int reconfigure_count; +}; + +/* + * A temporary filter used to track block ranges that haven't been created + * yet, whole relations that haven't been created yet, and whole relations + * that (we assume) have already been dropped, or will be created by bulk WAL + * operators. + */ +typedef struct XLogPrefetcherFilter +{ + RelFileNode rnode; + XLogRecPtr filter_until_replayed; + BlockNumber filter_from_block; + dlist_node link; +} XLogPrefetcherFilter; + +/* + * Counters exposed in shared memory for pg_stat_prefetch_recovery. + */ +typedef struct XLogPrefetchStats +{ + pg_atomic_uint64 reset_time; /* Time of last reset. */ + pg_atomic_uint64 prefetch; /* Prefetches initiated. */ + pg_atomic_uint64 hit; /* Blocks already in cache. */ + pg_atomic_uint64 skip_init; /* Zero-inited blocks skipped. */ + pg_atomic_uint64 skip_new; /* New/missing blocks filtered. */ + pg_atomic_uint64 skip_fpw; /* FPWs skipped. */ + + /* Reset counters */ + pg_atomic_uint32 reset_request; + uint32 reset_handled; + + /* Dynamic values */ + int wal_distance; /* Number of WAL bytes ahead. */ + int block_distance; /* Number of block references ahead. */ + int io_depth; /* Number of I/Os in progress. */ +} XLogPrefetchStats; + +static inline void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, + RelFileNode rnode, + BlockNumber blockno, + XLogRecPtr lsn); +static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, + RelFileNode rnode, + BlockNumber blockno); +static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, + XLogRecPtr replaying_lsn); +static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private, + XLogRecPtr *lsn); + +static XLogPrefetchStats *SharedStats; + +static inline LsnReadQueue * +lrq_alloc(uint32 max_distance, + uint32 max_inflight, + uintptr_t lrq_private, + LsnReadQueueNextFun next) +{ + LsnReadQueue *lrq; + uint32 size; + + Assert(max_distance >= max_inflight); + + size = max_distance + 1; /* full ring buffer has a gap */ + lrq = palloc(offsetof(LsnReadQueue, queue) + sizeof(lrq->queue[0]) * size); + lrq->lrq_private = lrq_private; + lrq->max_inflight = max_inflight; + lrq->size = size; + lrq->next = next; + lrq->head = 0; + lrq->tail = 0; + lrq->inflight = 0; + lrq->completed = 0; + + return lrq; +} + +static inline void +lrq_free(LsnReadQueue *lrq) +{ + pfree(lrq); +} + +static inline uint32 +lrq_inflight(LsnReadQueue *lrq) +{ + return lrq->inflight; +} + +static inline uint32 +lrq_completed(LsnReadQueue *lrq) +{ + return lrq->completed; +} + +static inline void +lrq_prefetch(LsnReadQueue *lrq) +{ + /* Try to start as many IOs as we can within our limits. */ + while (lrq->inflight < lrq->max_inflight && + lrq->inflight + lrq->completed < lrq->size - 1) + { + Assert(((lrq->head + 1) % lrq->size) != lrq->tail); + switch (lrq->next(lrq->lrq_private, &lrq->queue[lrq->head].lsn)) + { + case LRQ_NEXT_AGAIN: + return; + case LRQ_NEXT_IO: + lrq->queue[lrq->head].io = true; + lrq->inflight++; + break; + case LRQ_NEXT_NO_IO: + lrq->queue[lrq->head].io = false; + lrq->completed++; + break; + } + lrq->head++; + if (lrq->head == lrq->size) + lrq->head = 0; + } +} + +static inline void +lrq_complete_lsn(LsnReadQueue *lrq, XLogRecPtr lsn) +{ + /* + * We know that LSNs before 'lsn' have been replayed, so we can now assume + * that any IOs that were started before then have finished. + */ + while (lrq->tail != lrq->head && + lrq->queue[lrq->tail].lsn < lsn) + { + if (lrq->queue[lrq->tail].io) + lrq->inflight--; + else + lrq->completed--; + lrq->tail++; + if (lrq->tail == lrq->size) + lrq->tail = 0; + } + if (RecoveryPrefetchEnabled()) + lrq_prefetch(lrq); +} + +size_t +XLogPrefetchShmemSize(void) +{ + return sizeof(XLogPrefetchStats); +} + +static void +XLogPrefetchResetStats(void) +{ + pg_atomic_write_u64(&SharedStats->reset_time, GetCurrentTimestamp()); + pg_atomic_write_u64(&SharedStats->prefetch, 0); + pg_atomic_write_u64(&SharedStats->hit, 0); + pg_atomic_write_u64(&SharedStats->skip_init, 0); + pg_atomic_write_u64(&SharedStats->skip_new, 0); + pg_atomic_write_u64(&SharedStats->skip_fpw, 0); +} + +void +XLogPrefetchShmemInit(void) +{ + bool found; + + SharedStats = (XLogPrefetchStats *) + ShmemInitStruct("XLogPrefetchStats", + sizeof(XLogPrefetchStats), + &found); + + if (!found) + { + pg_atomic_init_u32(&SharedStats->reset_request, 0); + SharedStats->reset_handled = 0; + + pg_atomic_init_u64(&SharedStats->reset_time, GetCurrentTimestamp()); + pg_atomic_init_u64(&SharedStats->prefetch, 0); + pg_atomic_init_u64(&SharedStats->hit, 0); + pg_atomic_init_u64(&SharedStats->skip_init, 0); + pg_atomic_init_u64(&SharedStats->skip_new, 0); + pg_atomic_init_u64(&SharedStats->skip_fpw, 0); + } +} + +/* + * Called when any GUC is changed that affects prefetching. + */ +void +XLogPrefetchReconfigure(void) +{ + XLogPrefetchReconfigureCount++; +} + +/* + * Called by any backend to request that the stats be reset. + */ +void +XLogPrefetchRequestResetStats(void) +{ + pg_atomic_fetch_add_u32(&SharedStats->reset_request, 1); +} + +/* + * Increment a counter in shared memory. This is equivalent to *counter++ on a + * plain uint64 without any memory barrier or locking, except on platforms + * where readers can't read uint64 without possibly observing a torn value. + */ +static inline void +XLogPrefetchIncrement(pg_atomic_uint64 *counter) +{ + Assert(AmStartupProcess() || !IsUnderPostmaster); + pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1); +} + +/* + * Create a prefetcher that is ready to begin prefetching blocks referenced by + * WAL records. + */ +XLogPrefetcher * +XLogPrefetcherAllocate(XLogReaderState *reader) +{ + XLogPrefetcher *prefetcher; + static HASHCTL hash_table_ctl = { + .keysize = sizeof(RelFileNode), + .entrysize = sizeof(XLogPrefetcherFilter) + }; + + prefetcher = palloc0(sizeof(XLogPrefetcher)); + + prefetcher->reader = reader; + prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024, + &hash_table_ctl, + HASH_ELEM | HASH_BLOBS); + dlist_init(&prefetcher->filter_queue); + + SharedStats->wal_distance = 0; + SharedStats->block_distance = 0; + SharedStats->io_depth = 0; + + /* First usage will cause streaming_read to be allocated. */ + prefetcher->reconfigure_count = XLogPrefetchReconfigureCount - 1; + + return prefetcher; +} + +/* + * Destroy a prefetcher and release all resources. + */ +void +XLogPrefetcherFree(XLogPrefetcher *prefetcher) +{ + lrq_free(prefetcher->streaming_read); + hash_destroy(prefetcher->filter_table); + pfree(prefetcher); +} + +/* + * Provide access to the reader. + */ +XLogReaderState * +XLogPrefetcherReader(XLogPrefetcher *prefetcher) +{ + return prefetcher->reader; +} + +static void +XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher, XLogRecPtr lsn) +{ + uint32 io_depth; + uint32 completed; + uint32 reset_request; + int64 wal_distance; + + + /* How far ahead of replay are we now? */ + if (prefetcher->record) + wal_distance = prefetcher->record->lsn - prefetcher->reader->record->lsn; + else + wal_distance = 0; + + /* How many IOs are currently in flight and completed? */ + io_depth = lrq_inflight(prefetcher->streaming_read); + completed = lrq_completed(prefetcher->streaming_read); + + /* Update the instantaneous stats visible in pg_stat_prefetch_recovery. */ + SharedStats->io_depth = io_depth; + SharedStats->block_distance = io_depth + completed; + SharedStats->wal_distance = wal_distance; + + /* + * Have we been asked to reset our stats counters? This is checked with + * an unsynchronized memory read, but we'll see it eventually and we'll be + * accessing that cache line anyway. + */ + reset_request = pg_atomic_read_u32(&SharedStats->reset_request); + if (reset_request != SharedStats->reset_handled) + { + XLogPrefetchResetStats(); + SharedStats->reset_handled = reset_request; + } + + prefetcher->next_stats_shm_lsn = lsn + XLOGPREFETCHER_STATS_SHM_DISTANCE; +} + +/* + * A callback that reads ahead in the WAL and tries to initiate one IO. + */ +static LsnReadQueueNextStatus +XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn) +{ + XLogPrefetcher *prefetcher = (XLogPrefetcher *) pgsr_private; + XLogReaderState *reader = prefetcher->reader; + XLogRecPtr replaying_lsn = reader->ReadRecPtr; + + /* + * We keep track of the record and block we're up to between calls with + * prefetcher->record and prefetcher->next_block_id. + */ + for (;;) + { + DecodedXLogRecord *record; + + /* Try to read a new future record, if we don't already have one. */ + if (prefetcher->record == NULL) + { + bool nonblocking; + + /* + * If there are already records or an error queued up that could + * be replayed, we don't want to block here. Otherwise, it's OK + * to block waiting for more data: presumably the caller has + * nothing else to do. + */ + nonblocking = XLogReaderHasQueuedRecordOrError(reader); + + /* Certain records act as barriers for all readahead. */ + if (nonblocking && replaying_lsn < prefetcher->no_readahead_until) + return LRQ_NEXT_AGAIN; + + record = XLogReadAhead(prefetcher->reader, nonblocking); + if (record == NULL) + { + /* + * We can't read any more, due to an error or lack of data in + * nonblocking mode. + */ + return LRQ_NEXT_AGAIN; + } + + /* + * If prefetching is disabled, we don't need to analyze the record + * or issue any prefetches. We just need to cause one record to + * be decoded. + */ + if (!RecoveryPrefetchEnabled()) + { + *lsn = InvalidXLogRecPtr; + return LRQ_NEXT_NO_IO; + } + + /* We have a new record to process. */ + prefetcher->record = record; + prefetcher->next_block_id = 0; + } + else + { + /* Continue to process from last call, or last loop. */ + record = prefetcher->record; + } + + /* + * Check for operations that require us to filter out block ranges, or + * stop readahead completely. + * + * XXX Perhaps this information could be derived automatically if we + * had some standardized header flags and fields for these fields, + * instead of special logic. + * + * XXX Are there other operations that need this treatment? + */ + if (replaying_lsn < record->lsn) + { + uint8 rmid = record->header.xl_rmid; + uint8 record_type = record->header.xl_info & ~XLR_INFO_MASK; + + if (rmid == RM_XLOG_ID) + { + if (record_type == XLOG_CHECKPOINT_SHUTDOWN || + record_type == XLOG_END_OF_RECOVERY) + { + /* + * These records might change the TLI. Avoid potential + * bugs if we were to allow "read TLI" and "replay TLI" to + * differ without more analysis. + */ + prefetcher->no_readahead_until = record->lsn; + } + } + else if (rmid == RM_DBASE_ID) + { + if (record_type == XLOG_DBASE_CREATE) + { + xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) + record->main_data; + RelFileNode rnode = {InvalidOid, xlrec->db_id, InvalidOid}; + + /* + * Don't try to prefetch anything in this database until + * it has been created, or we might confuse blocks on OID + * wraparound. (We could use XLOG_DBASE_DROP instead, but + * there shouldn't be any reference to blocks in a + * database between DROP and CREATE for the same OID, and + * doing it on CREATE avoids the more expensive + * ENOENT-handling if we didn't treat CREATE as a + * barrier). + */ + XLogPrefetcherAddFilter(prefetcher, rnode, 0, record->lsn); + } + } + else if (rmid == RM_SMGR_ID) + { + if (record_type == XLOG_SMGR_CREATE) + { + xl_smgr_create *xlrec = (xl_smgr_create *) + record->main_data; + + /* + * Don't prefetch anything for this whole relation until + * it has been created, or we might confuse blocks on OID + * wraparound. + */ + XLogPrefetcherAddFilter(prefetcher, xlrec->rnode, 0, + record->lsn); + } + else if (record_type == XLOG_SMGR_TRUNCATE) + { + xl_smgr_truncate *xlrec = (xl_smgr_truncate *) + record->main_data; + + /* + * Don't prefetch anything in the truncated range until + * the truncation has been performed. + */ + XLogPrefetcherAddFilter(prefetcher, xlrec->rnode, + xlrec->blkno, + record->lsn); + } + } + } + + /* Scan the block references, starting where we left off last time. */ + while (prefetcher->next_block_id <= record->max_block_id) + { + int block_id = prefetcher->next_block_id++; + DecodedBkpBlock *block = &record->blocks[block_id]; + SMgrRelation reln; + PrefetchBufferResult result; + + if (!block->in_use) + continue; + + Assert(!BufferIsValid(block->prefetch_buffer));; + + /* + * Record the LSN of this record. When it's replayed, + * LsnReadQueue will consider any IOs submitted for earlier LSNs + * to be finished. + */ + *lsn = record->lsn; + + /* We don't try to prefetch anything but the main fork for now. */ + if (block->forknum != MAIN_FORKNUM) + { + return LRQ_NEXT_NO_IO; + } + + /* + * If there is a full page image attached, we won't be reading the + * page, so don't both trying to prefetch. + */ + if (block->has_image) + { + XLogPrefetchIncrement(&SharedStats->skip_fpw); + return LRQ_NEXT_NO_IO; + } + + /* There is no point in reading a page that will be zeroed. */ + if (block->flags & BKPBLOCK_WILL_INIT) + { + XLogPrefetchIncrement(&SharedStats->skip_init); + return LRQ_NEXT_NO_IO; + } + + /* Should we skip prefetching this block due to a filter? */ + if (XLogPrefetcherIsFiltered(prefetcher, block->rnode, block->blkno)) + { + XLogPrefetchIncrement(&SharedStats->skip_new); + return LRQ_NEXT_NO_IO; + } + + /* + * We could try to have a fast path for repeated references to the + * same relation (with some scheme to handle invalidations + * safely), but for now we'll call smgropen() every time. + */ + reln = smgropen(block->rnode, InvalidBackendId); + + /* + * If the block is past the end of the relation, filter out + * further accesses until this record is replayed. + */ + if (block->blkno >= smgrnblocks(reln, block->forknum)) + { + XLogPrefetcherAddFilter(prefetcher, block->rnode, block->blkno, + record->lsn); + XLogPrefetchIncrement(&SharedStats->skip_new); + return LRQ_NEXT_NO_IO; + } + + /* Try to initiate prefetching. */ + result = PrefetchSharedBuffer(reln, block->forknum, block->blkno); + if (BufferIsValid(result.recent_buffer)) + { + /* Cache hit, nothing to do. */ + XLogPrefetchIncrement(&SharedStats->hit); + block->prefetch_buffer = result.recent_buffer; + return LRQ_NEXT_NO_IO; + } + else if (result.initiated_io) + { + /* Cache miss, I/O (presumably) started. */ + XLogPrefetchIncrement(&SharedStats->prefetch); + block->prefetch_buffer = InvalidBuffer; + return LRQ_NEXT_IO; + } + else + { + /* + * Neither cached nor initiated. The underlying segment file + * doesn't exist. (ENOENT) + * + * It might be missing becaused it was unlinked, we crashed, + * and now we're replaying WAL. Recovery will correct this + * problem or complain if something is wrong. + */ + XLogPrefetcherAddFilter(prefetcher, block->rnode, 0, + record->lsn); + XLogPrefetchIncrement(&SharedStats->skip_new); + return LRQ_NEXT_NO_IO; + } + } + + /* + * Several callsites need to be able to read exactly one record + * without any internal readahead. Examples: xlog.c reading + * checkpoint records with emode set to PANIC, which might otherwise + * cause XLogPageRead() to panic on some future page, and xlog.c + * determining where to start writing WAL next, which depends on the + * contents of the reader's internal buffer after reading one record. + * Therefore, don't even think about prefetching until the first + * record after XLogPrefetcherBeginRead() has been consumed. + */ + if (prefetcher->reader->decode_queue_tail && + prefetcher->reader->decode_queue_tail->lsn == prefetcher->begin_ptr) + return LRQ_NEXT_AGAIN; + + /* Advance to the next record. */ + prefetcher->record = NULL; + } + pg_unreachable(); +} + +/* + * Expose statistics about recovery prefetching. + */ +Datum +pg_stat_get_prefetch_recovery(PG_FUNCTION_ARGS) +{ +#define PG_STAT_GET_PREFETCH_RECOVERY_COLS 10 + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc; + Tuplestorestate *tupstore; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + Datum values[PG_STAT_GET_PREFETCH_RECOVERY_COLS]; + bool nulls[PG_STAT_GET_PREFETCH_RECOVERY_COLS]; + + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + if (!(rsinfo->allowedModes & SFRM_Materialize)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mod required, but it is not allowed in this context"))); + + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + tupstore = tuplestore_begin_heap(true, false, work_mem); + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + + MemoryContextSwitchTo(oldcontext); + + if (pg_atomic_read_u32(&SharedStats->reset_request) != SharedStats->reset_handled) + { + /* There's an unhandled reset request, so just show NULLs */ + for (int i = 0; i < PG_STAT_GET_PREFETCH_RECOVERY_COLS; ++i) + nulls[i] = true; + } + else + { + for (int i = 0; i < PG_STAT_GET_PREFETCH_RECOVERY_COLS; ++i) + nulls[i] = false; + } + + values[0] = TimestampTzGetDatum(pg_atomic_read_u64(&SharedStats->reset_time)); + values[1] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->prefetch)); + values[2] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->hit)); + values[3] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_init)); + values[4] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_new)); + values[5] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_fpw)); + values[6] = Int32GetDatum(SharedStats->wal_distance); + values[7] = Int32GetDatum(SharedStats->block_distance); + values[8] = Int32GetDatum(SharedStats->io_depth); + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + tuplestore_donestoring(tupstore); + + return (Datum) 0; +} + +/* + * Don't prefetch any blocks >= 'blockno' from a given 'rnode', until 'lsn' + * has been replayed. + */ +static inline void +XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileNode rnode, + BlockNumber blockno, XLogRecPtr lsn) +{ + XLogPrefetcherFilter *filter; + bool found; + + filter = hash_search(prefetcher->filter_table, &rnode, HASH_ENTER, &found); + if (!found) + { + /* + * Don't allow any prefetching of this block or higher until replayed. + */ + filter->filter_until_replayed = lsn; + filter->filter_from_block = blockno; + dlist_push_head(&prefetcher->filter_queue, &filter->link); + } + else + { + /* + * We were already filtering this rnode. Extend the filter's lifetime + * to cover this WAL record, but leave the lower of the block numbers + * there because we don't want to have to track individual blocks. + */ + filter->filter_until_replayed = lsn; + dlist_delete(&filter->link); + dlist_push_head(&prefetcher->filter_queue, &filter->link); + filter->filter_from_block = Min(filter->filter_from_block, blockno); + } +} + +/* + * Have we replayed any records that caused us to begin filtering a block + * range? That means that relations should have been created, extended or + * dropped as required, so we can stop filtering out accesses to a given + * relfilenode. + */ +static inline void +XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn) +{ + while (unlikely(!dlist_is_empty(&prefetcher->filter_queue))) + { + XLogPrefetcherFilter *filter = dlist_tail_element(XLogPrefetcherFilter, + link, + &prefetcher->filter_queue); + + if (filter->filter_until_replayed >= replaying_lsn) + break; + + dlist_delete(&filter->link); + hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL); + } +} + +/* + * Check if a given block should be skipped due to a filter. + */ +static inline bool +XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileNode rnode, + BlockNumber blockno) +{ + /* + * Test for empty queue first, because we expect it to be empty most of + * the time and we can avoid the hash table lookup in that case. + */ + if (unlikely(!dlist_is_empty(&prefetcher->filter_queue))) + { + XLogPrefetcherFilter *filter; + + /* See if the block range is filtered. */ + filter = hash_search(prefetcher->filter_table, &rnode, HASH_FIND, NULL); + if (filter && filter->filter_from_block <= blockno) + return true; + + /* See if the whole database is filtered. */ + rnode.relNode = InvalidOid; + filter = hash_search(prefetcher->filter_table, &rnode, HASH_FIND, NULL); + if (filter && filter->filter_from_block <= blockno) + return true; + } + + return false; +} + +/* + * A wrapper for XLogBeginRead() that also resets the prefetcher. + */ +void +XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr) +{ + /* This will forget about any in-flight IO. */ + prefetcher->reconfigure_count--; + + /* Book-keeping to avoid readahead on first read. */ + prefetcher->begin_ptr = recPtr; + + prefetcher->no_readahead_until = 0; + + /* This will forget about any queued up records in the decoder. */ + XLogBeginRead(prefetcher->reader, recPtr); +} + +/* + * A wrapper for XLogReadRecord() that provides the same interface, but also + * tries to initiate I/O for blocks referenced in future WAL records. + */ +XLogRecord * +XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg) +{ + DecodedXLogRecord *record; + + /* + * See if it's time to reset the prefetching machinery, because a relevant + * GUC was changed. + */ + if (unlikely(XLogPrefetchReconfigureCount != prefetcher->reconfigure_count)) + { + if (prefetcher->streaming_read) + lrq_free(prefetcher->streaming_read); + + /* + * Arbitrarily look up to 4 times further ahead than the number of IOs + * we're allowed to run concurrently. + */ + prefetcher->streaming_read = + lrq_alloc(RecoveryPrefetchEnabled() ? maintenance_io_concurrency * 4 : 1, + RecoveryPrefetchEnabled() ? maintenance_io_concurrency : 1, + (uintptr_t) prefetcher, + XLogPrefetcherNextBlock); + + prefetcher->reconfigure_count = XLogPrefetchReconfigureCount; + } + + /* + * Release last returned record, if there is one. We need to do this so + * that we can check for empty decode queue accurately. + */ + XLogReleasePreviousRecord(prefetcher->reader); + + /* If there's nothing queued yet, then start prefetching. */ + if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader)) + lrq_prefetch(prefetcher->streaming_read); + + /* Read the next record. */ + record = XLogNextRecord(prefetcher->reader, errmsg); + if (!record) + return NULL; + + /* + * The record we just got is the "current" one, for the benefit of the + * XLogRecXXX() macros. + */ + Assert(record == prefetcher->reader->record); + + /* + * Can we drop any prefetch filters yet, given the record we're about to + * return? This assumes that any records with earlier LSNs have been + * replayed, so if we were waiting for a relation to be created or + * extended, it is now OK to access blocks in the covered range. + */ + XLogPrefetcherCompleteFilters(prefetcher, record->lsn); + + /* + * See if it's time to compute some statistics, because enough WAL has + * been processed. + */ + if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn)) + XLogPrefetcherComputeStats(prefetcher, record->lsn); + + /* + * The caller is about to replay this record, so we can now report that + * all IO initiated because of early WAL must be finished. This may + * trigger more readahead. + */ + lrq_complete_lsn(prefetcher->streaming_read, record->lsn); + + Assert(record == prefetcher->reader->record); + + return &record->header; +} + +bool +check_recovery_prefetch(int *new_value, void **extra, GucSource source) +{ +#ifndef USE_PREFETCH + if (*new_value == RECOVERY_PREFETCH_ON) + { + GUC_check_errdetail("recovery_prefetch not supported on platforms that lack posix_fadvise()."); + return false; + } +#endif + + return true; +} + +void +assign_recovery_prefetch(int new_value, void *extra) +{ + /* Reconfigure prefetching, because a setting it depends on changed. */ + recovery_prefetch = new_value; + if (AmStartupProcess()) + XLogPrefetchReconfigure(); +} diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index e437c42992..8800e88ad0 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -1727,6 +1727,8 @@ DecodeXLogRecord(XLogReaderState *state, blk->has_image = ((fork_flags & BKPBLOCK_HAS_IMAGE) != 0); blk->has_data = ((fork_flags & BKPBLOCK_HAS_DATA) != 0); + blk->prefetch_buffer = InvalidBuffer; + COPY_HEADER_FIELD(&blk->data_len, sizeof(uint16)); /* cross-check that the HAS_DATA flag is set iff data_length > 0 */ if (blk->has_data && blk->data_len == 0) @@ -1933,6 +1935,15 @@ err: bool XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id, RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum) +{ + return XLogRecGetBlockInfo(record, block_id, rnode, forknum, blknum, NULL); +} + +bool +XLogRecGetBlockInfo(XLogReaderState *record, uint8 block_id, + RelFileNode *rnode, ForkNumber *forknum, + BlockNumber *blknum, + Buffer *prefetch_buffer) { DecodedBkpBlock *bkpb; @@ -1947,6 +1958,8 @@ XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id, *forknum = bkpb->forknum; if (blknum) *blknum = bkpb->blkno; + if (prefetch_buffer) + *prefetch_buffer = bkpb->prefetch_buffer; return true; } diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 9feea3e6ec..e5e7821c79 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -36,6 +36,7 @@ #include "access/xact.h" #include "access/xlog_internal.h" #include "access/xlogarchive.h" +#include "access/xlogprefetcher.h" #include "access/xlogreader.h" #include "access/xlogrecovery.h" #include "access/xlogutils.h" @@ -183,6 +184,9 @@ static bool doRequestWalReceiverReply; /* XLogReader object used to parse the WAL records */ static XLogReaderState *xlogreader = NULL; +/* XLogPrefetcher object used to consume WAL records with read-ahead */ +static XLogPrefetcher *xlogprefetcher = NULL; + /* Parameters passed down from ReadRecord to the XLogPageRead callback. */ typedef struct XLogPageReadPrivate { @@ -404,18 +408,21 @@ static void recoveryPausesHere(bool endOfRecovery); static bool recoveryApplyDelay(XLogReaderState *record); static void ConfirmRecoveryPaused(void); -static XLogRecord *ReadRecord(XLogReaderState *xlogreader, - int emode, bool fetching_ckpt, TimeLineID replayTLI); +static XLogRecord *ReadRecord(XLogPrefetcher *xlogprefetcher, + int emode, bool fetching_ckpt, + TimeLineID replayTLI); static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *readBuf); -static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, - bool fetching_ckpt, - XLogRecPtr tliRecPtr, - TimeLineID replayTLI, - XLogRecPtr replayLSN); +static XLogPageReadResult WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, + bool randAccess, + bool fetching_ckpt, + XLogRecPtr tliRecPtr, + TimeLineID replayTLI, + XLogRecPtr replayLSN, + bool nonblocking); static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr); -static XLogRecord *ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, +static XLogRecord *ReadCheckpointRecord(XLogPrefetcher *xlogprefetcher, XLogRecPtr RecPtr, int whichChkpt, bool report, TimeLineID replayTLI); static bool rescanLatestTimeLine(TimeLineID replayTLI, XLogRecPtr replayLSN); static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, @@ -561,6 +568,15 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr, errdetail("Failed while allocating a WAL reading processor."))); xlogreader->system_identifier = ControlFile->system_identifier; + /* + * Set the WAL decode buffer size. This limits how far ahead we can read + * in the WAL. + */ + XLogReaderSetDecodeBuffer(xlogreader, NULL, wal_decode_buffer_size); + + /* Create a WAL prefetcher. */ + xlogprefetcher = XLogPrefetcherAllocate(xlogreader); + /* * Allocate two page buffers dedicated to WAL consistency checks. We do * it this way, rather than just making static arrays, for two reasons: @@ -589,7 +605,8 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr, * When a backup_label file is present, we want to roll forward from * the checkpoint it identifies, rather than using pg_control. */ - record = ReadCheckpointRecord(xlogreader, CheckPointLoc, 0, true, CheckPointTLI); + record = ReadCheckpointRecord(xlogprefetcher, CheckPointLoc, 0, true, + CheckPointTLI); if (record != NULL) { memcpy(&checkPoint, XLogRecGetData(xlogreader), sizeof(CheckPoint)); @@ -607,8 +624,8 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr, */ if (checkPoint.redo < CheckPointLoc) { - XLogBeginRead(xlogreader, checkPoint.redo); - if (!ReadRecord(xlogreader, LOG, false, + XLogPrefetcherBeginRead(xlogprefetcher, checkPoint.redo); + if (!ReadRecord(xlogprefetcher, LOG, false, checkPoint.ThisTimeLineID)) ereport(FATAL, (errmsg("could not find redo location referenced by checkpoint record"), @@ -727,7 +744,7 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr, CheckPointTLI = ControlFile->checkPointCopy.ThisTimeLineID; RedoStartLSN = ControlFile->checkPointCopy.redo; RedoStartTLI = ControlFile->checkPointCopy.ThisTimeLineID; - record = ReadCheckpointRecord(xlogreader, CheckPointLoc, 1, true, + record = ReadCheckpointRecord(xlogprefetcher, CheckPointLoc, 1, true, CheckPointTLI); if (record != NULL) { @@ -1403,8 +1420,8 @@ FinishWalRecovery(void) lastRec = XLogRecoveryCtl->lastReplayedReadRecPtr; lastRecTLI = XLogRecoveryCtl->lastReplayedTLI; } - XLogBeginRead(xlogreader, lastRec); - (void) ReadRecord(xlogreader, PANIC, false, lastRecTLI); + XLogPrefetcherBeginRead(xlogprefetcher, lastRec); + (void) ReadRecord(xlogprefetcher, PANIC, false, lastRecTLI); endOfLog = xlogreader->EndRecPtr; /* @@ -1501,6 +1518,8 @@ ShutdownWalRecovery(void) } XLogReaderFree(xlogreader); + XLogPrefetcherFree(xlogprefetcher); + if (ArchiveRecoveryRequested) { /* @@ -1584,15 +1603,15 @@ PerformWalRecovery(void) { /* back up to find the record */ replayTLI = RedoStartTLI; - XLogBeginRead(xlogreader, RedoStartLSN); - record = ReadRecord(xlogreader, PANIC, false, replayTLI); + XLogPrefetcherBeginRead(xlogprefetcher, RedoStartLSN); + record = ReadRecord(xlogprefetcher, PANIC, false, replayTLI); } else { /* just have to read next record after CheckPoint */ Assert(xlogreader->ReadRecPtr == CheckPointLoc); replayTLI = CheckPointTLI; - record = ReadRecord(xlogreader, LOG, false, replayTLI); + record = ReadRecord(xlogprefetcher, LOG, false, replayTLI); } if (record != NULL) @@ -1706,7 +1725,7 @@ PerformWalRecovery(void) } /* Else, try to fetch the next WAL record */ - record = ReadRecord(xlogreader, LOG, false, replayTLI); + record = ReadRecord(xlogprefetcher, LOG, false, replayTLI); } while (record != NULL); /* @@ -1922,6 +1941,9 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl */ if (AllowCascadeReplication()) WalSndWakeup(); + + /* Reset the prefetcher. */ + XLogPrefetchReconfigure(); } } @@ -2302,7 +2324,8 @@ verifyBackupPageConsistency(XLogReaderState *record) * temporary page. */ buf = XLogReadBufferExtended(rnode, forknum, blkno, - RBM_NORMAL_NO_LOG); + RBM_NORMAL_NO_LOG, + InvalidBuffer); if (!BufferIsValid(buf)) continue; @@ -2914,17 +2937,18 @@ ConfirmRecoveryPaused(void) * Attempt to read the next XLOG record. * * Before first call, the reader needs to be positioned to the first record - * by calling XLogBeginRead(). + * by calling XLogPrefetcherBeginRead(). * * If no valid record is available, returns NULL, or fails if emode is PANIC. * (emode must be either PANIC, LOG). In standby mode, retries until a valid * record is available. */ static XLogRecord * -ReadRecord(XLogReaderState *xlogreader, int emode, +ReadRecord(XLogPrefetcher *xlogprefetcher, int emode, bool fetching_ckpt, TimeLineID replayTLI) { XLogRecord *record; + XLogReaderState *xlogreader = XLogPrefetcherReader(xlogprefetcher); XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data; /* Pass through parameters to XLogPageRead */ @@ -2940,7 +2964,7 @@ ReadRecord(XLogReaderState *xlogreader, int emode, { char *errormsg; - record = XLogReadRecord(xlogreader, &errormsg); + record = XLogPrefetcherReadRecord(xlogprefetcher, &errormsg); if (record == NULL) { /* @@ -3073,6 +3097,9 @@ ReadRecord(XLogReaderState *xlogreader, int emode, * and call XLogPageRead() again with the same arguments. This lets * XLogPageRead() to try fetching the record from another source, or to * sleep and retry. + * + * While prefetching, xlogreader->nonblocking may be set. In that case, + * return XLREAD_WOULDBLOCK if we'd otherwise have to wait. */ static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, @@ -3122,20 +3149,31 @@ retry: (readSource == XLOG_FROM_STREAM && flushedUpto < targetPagePtr + reqLen)) { - if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen, - private->randAccess, - private->fetching_ckpt, - targetRecPtr, - private->replayTLI, - xlogreader->EndRecPtr)) + if (readFile >= 0 && + xlogreader->nonblocking && + readSource == XLOG_FROM_STREAM && + flushedUpto < targetPagePtr + reqLen) + return XLREAD_WOULDBLOCK; + + switch (WaitForWALToBecomeAvailable(targetPagePtr + reqLen, + private->randAccess, + private->fetching_ckpt, + targetRecPtr, + private->replayTLI, + xlogreader->EndRecPtr, + xlogreader->nonblocking)) { - if (readFile >= 0) - close(readFile); - readFile = -1; - readLen = 0; - readSource = XLOG_FROM_ANY; - - return -1; + case XLREAD_WOULDBLOCK: + return XLREAD_WOULDBLOCK; + case XLREAD_FAIL: + if (readFile >= 0) + close(readFile); + readFile = -1; + readLen = 0; + readSource = XLOG_FROM_ANY; + return XLREAD_FAIL; + case XLREAD_SUCCESS: + break; } } @@ -3260,7 +3298,7 @@ next_record_is_invalid: if (StandbyMode) goto retry; else - return -1; + return XLREAD_FAIL; } /* @@ -3292,11 +3330,15 @@ next_record_is_invalid: * containing it (if not open already), and returns true. When end of standby * mode is triggered by the user, and there is no more WAL available, returns * false. + * + * If nonblocking is true, then give up immediately if we can't satisfy the + * request, returning XLREAD_WOULDBLOCK instead of waiting. */ -static bool +static XLogPageReadResult WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, bool fetching_ckpt, XLogRecPtr tliRecPtr, - TimeLineID replayTLI, XLogRecPtr replayLSN) + TimeLineID replayTLI, XLogRecPtr replayLSN, + bool nonblocking) { static TimestampTz last_fail_time = 0; TimestampTz now; @@ -3350,6 +3392,14 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, */ if (lastSourceFailed) { + /* + * Don't allow any retry loops to occur during nonblocking + * readahead. Let the caller process everything that has been + * decoded already first. + */ + if (nonblocking) + return XLREAD_WOULDBLOCK; + switch (currentSource) { case XLOG_FROM_ARCHIVE: @@ -3364,7 +3414,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, if (StandbyMode && CheckForStandbyTrigger()) { XLogShutdownWalRcv(); - return false; + return XLREAD_FAIL; } /* @@ -3372,7 +3422,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, * and pg_wal. */ if (!StandbyMode) - return false; + return XLREAD_FAIL; /* * Move to XLOG_FROM_STREAM state, and set to start a @@ -3516,7 +3566,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, currentSource == XLOG_FROM_ARCHIVE ? XLOG_FROM_ANY : currentSource); if (readFile >= 0) - return true; /* success! */ + return XLREAD_SUCCESS; /* success! */ /* * Nope, not found in archive or pg_wal. @@ -3671,11 +3721,15 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, /* just make sure source info is correct... */ readSource = XLOG_FROM_STREAM; XLogReceiptSource = XLOG_FROM_STREAM; - return true; + return XLREAD_SUCCESS; } break; } + /* In nonblocking mode, return rather than sleeping. */ + if (nonblocking) + return XLREAD_WOULDBLOCK; + /* * Data not here yet. Check for trigger, then wait for * walreceiver to wake us up when new WAL arrives. @@ -3683,13 +3737,13 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, if (CheckForStandbyTrigger()) { /* - * Note that we don't "return false" immediately here. - * After being triggered, we still want to replay all - * the WAL that was already streamed. It's in pg_wal - * now, so we just treat this as a failure, and the - * state machine will move on to replay the streamed - * WAL from pg_wal, and then recheck the trigger and - * exit replay. + * Note that we don't return XLREAD_FAIL immediately + * here. After being triggered, we still want to + * replay all the WAL that was already streamed. It's + * in pg_wal now, so we just treat this as a failure, + * and the state machine will move on to replay the + * streamed WAL from pg_wal, and then recheck the + * trigger and exit replay. */ lastSourceFailed = true; break; @@ -3740,7 +3794,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, HandleStartupProcInterrupts(); } - return false; /* not reached */ + return XLREAD_FAIL; /* not reached */ } @@ -3785,7 +3839,7 @@ emode_for_corrupt_record(int emode, XLogRecPtr RecPtr) * 1 for "primary", 0 for "other" (backup_label) */ static XLogRecord * -ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, +ReadCheckpointRecord(XLogPrefetcher *xlogprefetcher, XLogRecPtr RecPtr, int whichChkpt, bool report, TimeLineID replayTLI) { XLogRecord *record; @@ -3812,8 +3866,8 @@ ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, return NULL; } - XLogBeginRead(xlogreader, RecPtr); - record = ReadRecord(xlogreader, LOG, true, replayTLI); + XLogPrefetcherBeginRead(xlogprefetcher, RecPtr); + record = ReadRecord(xlogprefetcher, LOG, true, replayTLI); if (record == NULL) { diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 511f2f186f..ea22577b41 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -22,6 +22,7 @@ #include "access/timeline.h" #include "access/xlogrecovery.h" #include "access/xlog_internal.h" +#include "access/xlogprefetcher.h" #include "access/xlogutils.h" #include "miscadmin.h" #include "pgstat.h" @@ -355,11 +356,13 @@ XLogReadBufferForRedoExtended(XLogReaderState *record, RelFileNode rnode; ForkNumber forknum; BlockNumber blkno; + Buffer prefetch_buffer; Page page; bool zeromode; bool willinit; - if (!XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blkno)) + if (!XLogRecGetBlockInfo(record, block_id, &rnode, &forknum, &blkno, + &prefetch_buffer)) { /* Caller specified a bogus block_id */ elog(PANIC, "failed to locate backup block with ID %d", block_id); @@ -381,7 +384,8 @@ XLogReadBufferForRedoExtended(XLogReaderState *record, { Assert(XLogRecHasBlockImage(record, block_id)); *buf = XLogReadBufferExtended(rnode, forknum, blkno, - get_cleanup_lock ? RBM_ZERO_AND_CLEANUP_LOCK : RBM_ZERO_AND_LOCK); + get_cleanup_lock ? RBM_ZERO_AND_CLEANUP_LOCK : RBM_ZERO_AND_LOCK, + prefetch_buffer); page = BufferGetPage(*buf); if (!RestoreBlockImage(record, block_id, page)) elog(ERROR, "failed to restore block image"); @@ -410,7 +414,7 @@ XLogReadBufferForRedoExtended(XLogReaderState *record, } else { - *buf = XLogReadBufferExtended(rnode, forknum, blkno, mode); + *buf = XLogReadBufferExtended(rnode, forknum, blkno, mode, prefetch_buffer); if (BufferIsValid(*buf)) { if (mode != RBM_ZERO_AND_LOCK && mode != RBM_ZERO_AND_CLEANUP_LOCK) @@ -450,6 +454,10 @@ XLogReadBufferForRedoExtended(XLogReaderState *record, * exist, and we don't check for all-zeroes. Thus, no log entry is made * to imply that the page should be dropped or truncated later. * + * Optionally, recent_buffer can be used to provide a hint about the location + * of the page in the buffer pool; it does not have to be correct, but avoids + * a buffer mapping table probe if it is. + * * NB: A redo function should normally not call this directly. To get a page * to modify, use XLogReadBufferForRedoExtended instead. It is important that * all pages modified by a WAL record are registered in the WAL records, or @@ -457,7 +465,8 @@ XLogReadBufferForRedoExtended(XLogReaderState *record, */ Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum, - BlockNumber blkno, ReadBufferMode mode) + BlockNumber blkno, ReadBufferMode mode, + Buffer recent_buffer) { BlockNumber lastblock; Buffer buffer; @@ -465,6 +474,15 @@ XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum, Assert(blkno != P_NEW); + /* Do we have a clue where the buffer might be already? */ + if (BufferIsValid(recent_buffer) && + mode == RBM_NORMAL && + ReadRecentBuffer(rnode, forknum, blkno, recent_buffer)) + { + buffer = recent_buffer; + goto recent_buffer_fast_path; + } + /* Open the relation at smgr level */ smgr = smgropen(rnode, InvalidBackendId); @@ -523,6 +541,7 @@ XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum, } } +recent_buffer_fast_path: if (mode == RBM_NORMAL) { /* check that page has been initialized */ diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index bb1ac30cd1..f7b4999caf 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -905,6 +905,19 @@ CREATE VIEW pg_stat_wal_receiver AS FROM pg_stat_get_wal_receiver() s WHERE s.pid IS NOT NULL; +CREATE VIEW pg_stat_prefetch_recovery AS + SELECT + s.stats_reset, + s.prefetch, + s.hit, + s.skip_init, + s.skip_new, + s.skip_fpw, + s.wal_distance, + s.block_distance, + s.io_depth + FROM pg_stat_get_prefetch_recovery() s; + CREATE VIEW pg_stat_subscription AS SELECT su.oid AS subid, diff --git a/src/backend/storage/freespace/freespace.c b/src/backend/storage/freespace/freespace.c index 78c073b7c9..d41ae37090 100644 --- a/src/backend/storage/freespace/freespace.c +++ b/src/backend/storage/freespace/freespace.c @@ -211,7 +211,8 @@ XLogRecordPageWithFreeSpace(RelFileNode rnode, BlockNumber heapBlk, blkno = fsm_logical_to_physical(addr); /* If the page doesn't exist already, extend */ - buf = XLogReadBufferExtended(rnode, FSM_FORKNUM, blkno, RBM_ZERO_ON_ERROR); + buf = XLogReadBufferExtended(rnode, FSM_FORKNUM, blkno, RBM_ZERO_ON_ERROR, + InvalidBuffer); LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); page = BufferGetPage(buf); diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index cd4ebe2fc5..17f54b153b 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -22,6 +22,7 @@ #include "access/subtrans.h" #include "access/syncscan.h" #include "access/twophase.h" +#include "access/xlogprefetcher.h" #include "access/xlogrecovery.h" #include "commands/async.h" #include "miscadmin.h" @@ -119,6 +120,7 @@ CalculateShmemSize(int *num_semaphores) size = add_size(size, LockShmemSize()); size = add_size(size, PredicateLockShmemSize()); size = add_size(size, ProcGlobalShmemSize()); + size = add_size(size, XLogPrefetchShmemSize()); size = add_size(size, XLOGShmemSize()); size = add_size(size, XLogRecoveryShmemSize()); size = add_size(size, CLOGShmemSize()); @@ -243,6 +245,7 @@ CreateSharedMemoryAndSemaphores(void) * Set up xlog, clog, and buffers */ XLOGShmemInit(); + XLogPrefetchShmemInit(); XLogRecoveryShmemInit(); CLOGShmemInit(); CommitTsShmemInit(); diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index e7f0a380e6..e34821e98e 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -41,6 +41,7 @@ #include "access/twophase.h" #include "access/xact.h" #include "access/xlog_internal.h" +#include "access/xlogprefetcher.h" #include "access/xlogrecovery.h" #include "catalog/namespace.h" #include "catalog/pg_authid.h" @@ -215,6 +216,7 @@ static bool check_effective_io_concurrency(int *newval, void **extra, GucSource static bool check_maintenance_io_concurrency(int *newval, void **extra, GucSource source); static bool check_huge_page_size(int *newval, void **extra, GucSource source); static bool check_client_connection_check_interval(int *newval, void **extra, GucSource source); +static void assign_maintenance_io_concurrency(int newval, void *extra); static void assign_pgstat_temp_directory(const char *newval, void *extra); static bool check_application_name(char **newval, void **extra, GucSource source); static void assign_application_name(const char *newval, void *extra); @@ -479,6 +481,19 @@ static const struct config_enum_entry huge_pages_options[] = { {NULL, 0, false} }; +static const struct config_enum_entry recovery_prefetch_options[] = { + {"off", RECOVERY_PREFETCH_OFF, false}, + {"on", RECOVERY_PREFETCH_ON, false}, + {"try", RECOVERY_PREFETCH_TRY, false}, + {"true", RECOVERY_PREFETCH_ON, true}, + {"false", RECOVERY_PREFETCH_OFF, true}, + {"yes", RECOVERY_PREFETCH_ON, true}, + {"no", RECOVERY_PREFETCH_OFF, true}, + {"1", RECOVERY_PREFETCH_ON, true}, + {"0", RECOVERY_PREFETCH_OFF, true}, + {NULL, 0, false} +}; + static const struct config_enum_entry force_parallel_mode_options[] = { {"off", FORCE_PARALLEL_OFF, false}, {"on", FORCE_PARALLEL_ON, false}, @@ -2792,6 +2807,17 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"wal_decode_buffer_size", PGC_POSTMASTER, WAL_ARCHIVE_RECOVERY, + gettext_noop("Maximum buffer size for reading ahead in the WAL during recovery."), + gettext_noop("This controls the maximum distance we can read ahead in the WAL to prefetch referenced blocks."), + GUC_UNIT_BYTE + }, + &wal_decode_buffer_size, + 512 * 1024, 64 * 1024, INT_MAX, + NULL, NULL, NULL + }, + { {"wal_keep_size", PGC_SIGHUP, REPLICATION_SENDING, gettext_noop("Sets the size of WAL files held for standby servers."), @@ -3115,7 +3141,8 @@ static struct config_int ConfigureNamesInt[] = 0, #endif 0, MAX_IO_CONCURRENCY, - check_maintenance_io_concurrency, NULL, NULL + check_maintenance_io_concurrency, assign_maintenance_io_concurrency, + NULL }, { @@ -4975,6 +5002,16 @@ static struct config_enum ConfigureNamesEnum[] = NULL, NULL, NULL }, + { + {"recovery_prefetch", PGC_SIGHUP, WAL_SETTINGS, + gettext_noop("Prefetch referenced blocks during recovery"), + gettext_noop("Read ahead of the current replay position to find uncached blocks.") + }, + &recovery_prefetch, + RECOVERY_PREFETCH_OFF, recovery_prefetch_options, + check_recovery_prefetch, assign_recovery_prefetch, NULL + }, + { {"force_parallel_mode", PGC_USERSET, DEVELOPER_OPTIONS, gettext_noop("Forces use of parallel query facilities."), @@ -12211,6 +12248,20 @@ check_client_connection_check_interval(int *newval, void **extra, GucSource sour return true; } +static void +assign_maintenance_io_concurrency(int newval, void *extra) +{ +#ifdef USE_PREFETCH + /* + * Reconfigure recovery prefetching, because a setting it depends on + * changed. + */ + maintenance_io_concurrency = newval; + if (AmStartupProcess()) + XLogPrefetchReconfigure(); +#endif +} + static void assign_pgstat_temp_directory(const char *newval, void *extra) { diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 4cf5b26a36..0a6c7bd83e 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -241,6 +241,11 @@ #max_wal_size = 1GB #min_wal_size = 80MB +# - Prefetching during recovery - + +#wal_decode_buffer_size = 512kB # lookahead window used for prefetching +#recovery_prefetch = off # prefetch pages referenced in the WAL? + # - Archiving - #archive_mode = off # enables archiving; off, on, or always diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 09f6464331..1df9dd2fbe 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -50,6 +50,7 @@ extern bool *wal_consistency_checking; extern char *wal_consistency_checking_string; extern bool log_checkpoints; extern bool track_wal_io_timing; +extern int wal_decode_buffer_size; extern int CheckPointSegments; diff --git a/src/include/access/xlogprefetcher.h b/src/include/access/xlogprefetcher.h new file mode 100644 index 0000000000..03f0cefecd --- /dev/null +++ b/src/include/access/xlogprefetcher.h @@ -0,0 +1,51 @@ +/*------------------------------------------------------------------------- + * + * xlogprefetcher.h + * Declarations for the recovery prefetching module. + * + * Portions Copyright (c) 2022, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/include/access/xlogprefetcher.h + *------------------------------------------------------------------------- + */ +#ifndef XLOGPREFETCHER_H +#define XLOGPREFETCHER_H + +#include "access/xlogdefs.h" + +/* GUCs */ +extern int recovery_prefetch; + +/* Possible values for recovery_prefetch */ +typedef enum +{ + RECOVERY_PREFETCH_OFF, + RECOVERY_PREFETCH_ON, + RECOVERY_PREFETCH_TRY +} RecoveryPrefetchValue; + +struct XLogPrefetcher; +typedef struct XLogPrefetcher XLogPrefetcher; + + +extern void XLogPrefetchReconfigure(void); + +extern size_t XLogPrefetchShmemSize(void); +extern void XLogPrefetchShmemInit(void); + +extern void XLogPrefetchRequestResetStats(void); + +extern XLogPrefetcher *XLogPrefetcherAllocate(XLogReaderState *reader); +extern void XLogPrefetcherFree(XLogPrefetcher *prefetcher); + +extern XLogReaderState *XLogPrefetcherReader(XLogPrefetcher *prefetcher); + +extern void XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, + XLogRecPtr recPtr); + +extern XLogRecord *XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, + char **errmsg); + +#endif diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index f4388cc9be..be266296d5 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -39,6 +39,7 @@ #endif #include "access/xlogrecord.h" +#include "storage/buf.h" /* WALOpenSegment represents a WAL segment being read. */ typedef struct WALOpenSegment @@ -125,6 +126,9 @@ typedef struct ForkNumber forknum; BlockNumber blkno; + /* Prefetching workspace. */ + Buffer prefetch_buffer; + /* copy of the fork_flags field from the XLogRecordBlockHeader */ uint8 flags; @@ -430,5 +434,9 @@ extern char *XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size * extern bool XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id, RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum); +extern bool XLogRecGetBlockInfo(XLogReaderState *record, uint8 block_id, + RelFileNode *rnode, ForkNumber *forknum, + BlockNumber *blknum, + Buffer *prefetch_buffer); #endif /* XLOGREADER_H */ diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index 64708949db..ff40f96e42 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -84,7 +84,8 @@ extern XLogRedoAction XLogReadBufferForRedoExtended(XLogReaderState *record, Buffer *buf); extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum, - BlockNumber blkno, ReadBufferMode mode); + BlockNumber blkno, ReadBufferMode mode, + Buffer recent_buffer); extern Relation CreateFakeRelcacheEntry(RelFileNode rnode); extern void FreeFakeRelcacheEntry(Relation fakerel); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index d8e8715ed1..534ad0a5fb 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -6360,6 +6360,14 @@ prorettype => 'text', proargtypes => '', prosrc => 'pg_get_wal_replay_pause_state' }, +{ oid => '9085', descr => 'statistics: information about WAL prefetching', + proname => 'pg_stat_get_prefetch_recovery', prorows => '1', provolatile => 'v', + proretset => 't', prorettype => 'record', proargtypes => '', + proallargtypes => '{timestamptz,int8,int8,int8,int8,int8,int4,int4,int4}', + proargmodes => '{o,o,o,o,o,o,o,o,o}', + proargnames => '{stats_reset,prefetch,hit,skip_init,skip_new,skip_fpw,wal_distance,block_distance,io_depth}', + prosrc => 'pg_stat_get_prefetch_recovery' }, + { oid => '2621', descr => 'reload configuration files', proname => 'pg_reload_conf', provolatile => 'v', prorettype => 'bool', proargtypes => '', prosrc => 'pg_reload_conf' }, diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h index ea774968f0..c9b258508d 100644 --- a/src/include/utils/guc.h +++ b/src/include/utils/guc.h @@ -450,4 +450,8 @@ extern void assign_search_path(const char *newval, void *extra); extern bool check_wal_buffers(int *newval, void **extra, GucSource source); extern void assign_xlog_sync_method(int new_sync_method, void *extra); +/* in access/transam/xlogprefetcher.c */ +extern bool check_recovery_prefetch(int *new_value, void **extra, GucSource source); +extern void assign_recovery_prefetch(int new_value, void *extra); + #endif /* GUC_H */ diff --git a/src/test/recovery/t/027_stream_regress.pl b/src/test/recovery/t/027_stream_regress.pl index c40951b7ba..93ef4ef436 100644 --- a/src/test/recovery/t/027_stream_regress.pl +++ b/src/test/recovery/t/027_stream_regress.pl @@ -19,6 +19,9 @@ $node_primary->init(allows_streaming => 1); $node_primary->adjust_conf('postgresql.conf', 'max_connections', '25'); $node_primary->append_conf('postgresql.conf', 'max_prepared_transactions = 10'); +# Enable recovery prefetch, if available on this platform +$node_primary->append_conf('postgresql.conf', 'recovery_prefetch = try'); + # WAL consistency checking is resource intensive so require opt-in with the # PG_TEST_EXTRA environment variable. if ($ENV{PG_TEST_EXTRA} && diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index ac468568a1..8ad54191cd 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1857,6 +1857,16 @@ pg_stat_gssapi| SELECT s.pid, s.gss_enc AS encrypted FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, query_id) WHERE (s.client_port IS NOT NULL); +pg_stat_prefetch_recovery| SELECT s.stats_reset, + s.prefetch, + s.hit, + s.skip_init, + s.skip_new, + s.skip_fpw, + s.wal_distance, + s.block_distance, + s.io_depth + FROM pg_stat_get_prefetch_recovery() s(stats_reset, prefetch, hit, skip_init, skip_new, skip_fpw, wal_distance, block_distance, io_depth); pg_stat_progress_analyze| SELECT s.pid, s.datid, d.datname, diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 93d5190508..7790573105 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1408,6 +1408,9 @@ LogicalRepWorker LogicalRewriteMappingData LogicalTape LogicalTapeSet +LsnReadQueue +LsnReadQueueNextFun +LsnReadQueueNextStatus LtreeGistOptions LtreeSignature MAGIC @@ -2946,6 +2949,10 @@ XLogPageHeaderData XLogPageReadCB XLogPageReadPrivate XLogPageReadResult +XLogPrefetcher +XLogPrefetcherFilter +XLogPrefetchState +XLogPrefetchStats XLogReaderRoutine XLogReaderState XLogRecData -- 2.30.2