On Fri, Mar 1, 2024 at 2:06 AM Jeff Davis <[email protected]> wrote:
> Added to March CF.
>
> I don't have an immediate use case in mind for this, so please drive
> that part of the discussion. I can't promise this for 17, but if the
> patch is simple enough and a quick consensus develops, then it's
> possible.
[pgss_001.v1.patch] adds a custom resource manager to the
pg_stat_statements extension. The proposed patch is not a complete solution
for pgss and may not work correctly with replication.
The 020_crash.pl test demonstrates server interruption by killing a
backend. Without rm_checkpoint hook, the server restores pgss stats only
after last CHECKPOINT. Data added to WAL before the checkpoint is not
restored.
The rm_checkpoint hook allows saving shared memory data to disk at each
checkpoint. However, for pg_stat_statements, it matters when the checkpoint
occurred. When the server shuts down, pgss deletes the temporary file of
query texts. In other cases, this is unacceptable.
To provide this capability, a flags parameter was added to the
rm_checkpoint hook. The changes are presented in [rmgr_003.v2.patch].
--
Regards,
Daniil Anisimov
Postgres Professional: http://postgrespro.com
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 3e2f1d4a23..ad0a1d5134 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -44,8 +44,8 @@
/* must be kept in sync with RmgrData definition in xlog_internal.h */
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
- { name, redo, desc, identify, startup, cleanup, mask, decode },
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode,checkpoint) \
+ { name, redo, desc, identify, startup, cleanup, mask, decode, checkpoint },
RmgrData RmgrTable[RM_MAX_ID + 1] = {
#include "access/rmgrlist.h"
@@ -83,6 +83,22 @@ RmgrCleanup(void)
}
}
+/*
+ * Checkpoint all resource managers.
+ */
+void
+RmgrCheckpoint(int flags)
+{
+ for (int rmid = 0; rmid <= RM_MAX_ID; rmid++)
+ {
+ if (!RmgrIdExists(rmid))
+ continue;
+
+ if (RmgrTable[rmid].rm_checkpoint != NULL)
+ RmgrTable[rmid].rm_checkpoint(flags);
+ }
+}
+
/*
* Emit ERROR when we encounter a record with an RmgrId we don't
* recognize.
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 20a5f86209..d21bf8ae24 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7357,6 +7357,9 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
CheckPointSUBTRANS();
CheckPointMultiXact();
CheckPointPredicate();
+
+ RmgrCheckpoint(flags);
+
CheckPointBuffers(flags);
/* Perform all queued up fsyncs */
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 22f7351fdc..11ae1e7af4 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -28,7 +28,7 @@
* RmgrNames is an array of the built-in resource manager names, to make error
* messages a bit nicer.
*/
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode,checkpoint) \
name,
static const char *const RmgrNames[RM_MAX_ID + 1] = {
diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c
index 6b8c17bb4c..2bb5ba8c9f 100644
--- a/src/bin/pg_waldump/rmgrdesc.c
+++ b/src/bin/pg_waldump/rmgrdesc.c
@@ -32,7 +32,7 @@
#include "storage/standbydefs.h"
#include "utils/relmapper.h"
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode,checkpoint) \
{ name, desc, identify},
static const RmgrDescData RmgrDescTable[RM_N_BUILTIN_IDS] = {
diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h
index 3b6a497e1b..34ddc0210c 100644
--- a/src/include/access/rmgr.h
+++ b/src/include/access/rmgr.h
@@ -19,7 +19,7 @@ typedef uint8 RmgrId;
* Note: RM_MAX_ID must fit in RmgrId; widening that type will affect the XLOG
* file format.
*/
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode,checkpoint) \
symname,
typedef enum RmgrIds
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index 78e6b908c6..0b03cc69be 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -24,26 +24,26 @@
* Changes to this list possibly need an XLOG_PAGE_MAGIC bump.
*/
-/* symbol name, textual name, redo, desc, identify, startup, cleanup, mask, decode */
-PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL, xlog_decode)
-PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL, xact_decode)
-PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, NULL)
-PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL, NULL)
-PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL, NULL)
-PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL, NULL)
-PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL, NULL)
-PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL, NULL)
-PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL, standby_decode)
-PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask, heap2_decode)
-PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask, heap_decode)
-PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog_startup, btree_xlog_cleanup, btree_mask, NULL)
-PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask, NULL)
-PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask, NULL)
-PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask, NULL)
-PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, NULL)
-PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask, NULL)
-PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask, NULL)
-PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL)
-PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL)
-PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL)
-PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode)
+/* symbol name, textual name, redo, desc, identify, startup, cleanup, mask, decode, checkpoint */
+PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL, xlog_decode, NULL)
+PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL, xact_decode, NULL)
+PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL, standby_decode, NULL)
+PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask, heap2_decode, NULL)
+PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask, heap_decode, NULL)
+PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog_startup, btree_xlog_cleanup, btree_mask, NULL, NULL)
+PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask, NULL, NULL)
+PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask, NULL, NULL)
+PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask, NULL, NULL)
+PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, NULL, NULL)
+PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask, NULL, NULL)
+PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask, NULL, NULL)
+PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL, NULL)
+PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode, NULL)
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index b88b24f0c1..220f45b0ae 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -356,11 +356,13 @@ typedef struct RmgrData
void (*rm_mask) (char *pagedata, BlockNumber blkno);
void (*rm_decode) (struct LogicalDecodingContext *ctx,
struct XLogRecordBuffer *buf);
+ void (*rm_checkpoint) (int flags);
} RmgrData;
extern PGDLLIMPORT RmgrData RmgrTable[];
extern void RmgrStartup(void);
extern void RmgrCleanup(void);
+extern void RmgrCheckpoint(int flags);
extern void RmgrNotFound(RmgrId rmid);
extern void RegisterCustomRmgr(RmgrId rmid, const RmgrData *rmgr);
diff --git a/contrib/pg_stat_statements/expected/wal.out b/contrib/pg_stat_statements/expected/wal.out
index 34a2bf5b03..4b2220a96b 100644
--- a/contrib/pg_stat_statements/expected/wal.out
+++ b/contrib/pg_stat_statements/expected/wal.out
@@ -17,7 +17,7 @@ FROM pg_stat_statements ORDER BY query COLLATE "C";
--------------------------------------------------------------+-------+------+---------------------+-----------------------+---------------------
DELETE FROM pgss_wal_tab WHERE a > $1 | 1 | 1 | t | t | t
INSERT INTO pgss_wal_tab VALUES(generate_series($1, $2), $3) | 1 | 10 | t | t | t
- SELECT pg_stat_statements_reset() IS NOT NULL AS t | 1 | 1 | f | f | f
+ SELECT pg_stat_statements_reset() IS NOT NULL AS t | 1 | 1 | t | t | t
SET pg_stat_statements.track_utility = FALSE | 1 | 0 | f | f | t
UPDATE pgss_wal_tab SET b = $1 WHERE a > $2 | 1 | 3 | t | t | t
(5 rows)
diff --git a/contrib/pg_stat_statements/pg_stat_statements.c b/contrib/pg_stat_statements/pg_stat_statements.c
index 67cec865ba..d0220fd9eb 100644
--- a/contrib/pg_stat_statements/pg_stat_statements.c
+++ b/contrib/pg_stat_statements/pg_stat_statements.c
@@ -74,6 +74,10 @@
#include "utils/memutils.h"
#include "utils/timestamp.h"
+#include "access/xlog.h"
+#include "access/xlog_internal.h"
+#include "access/xloginsert.h"
+
PG_MODULE_MAGIC;
/* Location of permanent stats file (valid when database is shut down) */
@@ -323,7 +327,6 @@ PG_FUNCTION_INFO_V1(pg_stat_statements_info);
static void pgss_shmem_request(void);
static void pgss_shmem_startup(void);
-static void pgss_shmem_shutdown(int code, Datum arg);
static void pgss_post_parse_analyze(ParseState *pstate, Query *query,
JumbleState *jstate);
static PlannedStmt *pgss_planner(Query *parse,
@@ -370,6 +373,50 @@ static void fill_in_constant_lengths(JumbleState *jstate, const char *query,
int query_loc);
static int comp_location(const void *a, const void *b);
+/* RMGR API */
+#define CUSTOMRMGR_ID RM_EXPERIMENTAL_ID
+#define CUSTOMRMGR_NAME "pgss_rmgr"
+
+static void rmgr_redo(XLogReaderState *record);
+static void rmgr_desc(StringInfo buf, XLogReaderState *record);
+static const char *rmgr_identify(uint8 info);
+static void rmgr_checkpoint(int flags);
+
+/* WAL record definitions */
+#define PGSS_XLOG_INSERT 0x00
+#define PGSS_XLOG_RESET 0x10
+
+/* The necessary fields from pgssEntry */
+typedef struct pgssXLogInsert
+{
+ uint32 header;
+ pgssHashKey key;
+ Counters counters;
+ int encoding;
+ TimestampTz stats_since;
+ TimestampTz minmax_stats_since;
+ int query_len;
+ char qtext[FLEXIBLE_ARRAY_MEMBER];
+} pgssXLogInsert;
+
+/* The params of entry_reset() function */
+typedef struct pgssXLogReset
+{
+ uint32 header;
+ Oid userid;
+ uint64 queryid;
+ Oid dbid;
+ bool minmax_only;
+} pgssXLogReset;
+
+/* RMGR data */
+const RmgrData pgss_rmgr = {
+ .rm_name = CUSTOMRMGR_NAME,
+ .rm_redo = rmgr_redo,
+ .rm_checkpoint = rmgr_checkpoint,
+ .rm_identify = rmgr_identify,
+ .rm_desc = rmgr_desc
+};
/*
* Module load callback
@@ -457,6 +504,8 @@ _PG_init(void)
MarkGUCPrefixReserved("pg_stat_statements");
+ RegisterCustomRmgr(CUSTOMRMGR_ID, &pgss_rmgr);
+
/*
* Install hooks.
*/
@@ -556,9 +605,12 @@ pgss_shmem_startup(void)
/*
* If we're in the postmaster (or a standalone backend...), set up a shmem
* exit hook to dump the statistics to disk.
+ *
+ * Now we do it at CHECKPOINT.
+ *
+ *if (!IsUnderPostmaster)
+ * on_shmem_exit(pgss_shmem_shutdown, (Datum) 0);
*/
- if (!IsUnderPostmaster)
- on_shmem_exit(pgss_shmem_shutdown, (Datum) 0);
/*
* Done if some other process already completed our initialization.
@@ -720,108 +772,6 @@ fail:
*/
}
-/*
- * shmem_shutdown hook: Dump statistics into file.
- *
- * Note: we don't bother with acquiring lock, because there should be no
- * other processes running when this is called.
- */
-static void
-pgss_shmem_shutdown(int code, Datum arg)
-{
- FILE *file;
- char *qbuffer = NULL;
- Size qbuffer_size = 0;
- HASH_SEQ_STATUS hash_seq;
- int32 num_entries;
- pgssEntry *entry;
-
- /* Don't try to dump during a crash. */
- if (code)
- return;
-
- /* Safety check ... shouldn't get here unless shmem is set up. */
- if (!pgss || !pgss_hash)
- return;
-
- /* Don't dump if told not to. */
- if (!pgss_save)
- return;
-
- file = AllocateFile(PGSS_DUMP_FILE ".tmp", PG_BINARY_W);
- if (file == NULL)
- goto error;
-
- if (fwrite(&PGSS_FILE_HEADER, sizeof(uint32), 1, file) != 1)
- goto error;
- if (fwrite(&PGSS_PG_MAJOR_VERSION, sizeof(uint32), 1, file) != 1)
- goto error;
- num_entries = hash_get_num_entries(pgss_hash);
- if (fwrite(&num_entries, sizeof(int32), 1, file) != 1)
- goto error;
-
- qbuffer = qtext_load_file(&qbuffer_size);
- if (qbuffer == NULL)
- goto error;
-
- /*
- * When serializing to disk, we store query texts immediately after their
- * entry data. Any orphaned query texts are thereby excluded.
- */
- hash_seq_init(&hash_seq, pgss_hash);
- while ((entry = hash_seq_search(&hash_seq)) != NULL)
- {
- int len = entry->query_len;
- char *qstr = qtext_fetch(entry->query_offset, len,
- qbuffer, qbuffer_size);
-
- if (qstr == NULL)
- continue; /* Ignore any entries with bogus texts */
-
- if (fwrite(entry, sizeof(pgssEntry), 1, file) != 1 ||
- fwrite(qstr, 1, len + 1, file) != len + 1)
- {
- /* note: we assume hash_seq_term won't change errno */
- hash_seq_term(&hash_seq);
- goto error;
- }
- }
-
- /* Dump global statistics for pg_stat_statements */
- if (fwrite(&pgss->stats, sizeof(pgssGlobalStats), 1, file) != 1)
- goto error;
-
- free(qbuffer);
- qbuffer = NULL;
-
- if (FreeFile(file))
- {
- file = NULL;
- goto error;
- }
-
- /*
- * Rename file into place, so we atomically replace any old one.
- */
- (void) durable_rename(PGSS_DUMP_FILE ".tmp", PGSS_DUMP_FILE, LOG);
-
- /* Unlink query-texts file; it's not needed while shutdown */
- unlink(PGSS_TEXT_FILE);
-
- return;
-
-error:
- ereport(LOG,
- (errcode_for_file_access(),
- errmsg("could not write file \"%s\": %m",
- PGSS_DUMP_FILE ".tmp")));
- free(qbuffer);
- if (file)
- FreeFile(file);
- unlink(PGSS_DUMP_FILE ".tmp");
- unlink(PGSS_TEXT_FILE);
-}
-
/*
* Post-parse-analysis hook: mark query with a queryId
*/
@@ -1284,6 +1234,7 @@ pgss_store(const char *query, uint64 queryId,
pgssEntry *entry;
char *norm_query = NULL;
int encoding = GetDatabaseEncoding();
+ bool qtext_stored = false;
Assert(query != NULL);
@@ -1325,7 +1276,6 @@ pgss_store(const char *query, uint64 queryId,
{
Size query_offset;
int gc_count;
- bool stored;
bool do_gc;
/*
@@ -1345,7 +1295,7 @@ pgss_store(const char *query, uint64 queryId,
}
/* Append new query text to file with only shared lock held */
- stored = qtext_store(norm_query ? norm_query : query, query_len,
+ qtext_stored = qtext_store(norm_query ? norm_query : query, query_len,
&query_offset, &gc_count);
/*
@@ -1366,12 +1316,12 @@ pgss_store(const char *query, uint64 queryId,
* This should be infrequent enough that doing it while holding
* exclusive lock isn't a performance problem.
*/
- if (!stored || pgss->gc_count != gc_count)
- stored = qtext_store(norm_query ? norm_query : query, query_len,
+ if (!qtext_stored || pgss->gc_count != gc_count)
+ qtext_stored = qtext_store(norm_query ? norm_query : query, query_len,
&query_offset, NULL);
/* If we failed to write to the text file, give up */
- if (!stored)
+ if (!qtext_stored)
goto done;
/* OK to create a new hashtable entry */
@@ -1486,6 +1436,38 @@ pgss_store(const char *query, uint64 queryId,
SpinLockRelease(&e->mutex);
}
+ /* Write entry to XLOG */
+ if (pgss_save && !RecoveryInProgress())
+ {
+ pgssXLogInsert *xlog_entry;
+ XLogRecPtr ptr;
+
+ xlog_entry = palloc(sizeof(pgssXLogInsert));
+ xlog_entry->header = PGSS_FILE_HEADER;
+ xlog_entry->key = entry->key;
+ xlog_entry->counters = entry->counters;
+ xlog_entry->encoding = entry->encoding;
+ xlog_entry->stats_since = entry->stats_since;
+ xlog_entry->minmax_stats_since = entry->minmax_stats_since;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) xlog_entry, offsetof(pgssXLogInsert, qtext));
+
+ /* Write the query text if need */
+ if (qtext_stored)
+ {
+ xlog_entry->query_len = entry->query_len;
+ XLogRegisterData(norm_query ? norm_query : (char *) query, query_len);
+ }
+ else
+ xlog_entry->query_len = 0;
+
+ XLogSetRecordFlags(XLOG_MARK_UNIMPORTANT);
+ ptr = XLogInsert(CUSTOMRMGR_ID, PGSS_XLOG_INSERT);
+ XLogFlush(ptr);
+ pfree(xlog_entry);
+ }
+
done:
LWLockRelease(pgss->lock);
@@ -2676,6 +2658,27 @@ entry_reset(Oid userid, Oid dbid, uint64 queryid, bool minmax_only)
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
num_entries = hash_get_num_entries(pgss_hash);
+ /* Write entry to XLOG */
+ if (pgss_save && !RecoveryInProgress())
+ {
+ pgssXLogReset *xlrec;
+ XLogRecPtr ptr;
+
+ xlrec = palloc0(sizeof(pgssXLogReset));
+ xlrec->header = PGSS_FILE_HEADER;
+ xlrec->userid = userid;
+ xlrec->dbid = dbid;
+ xlrec->queryid = queryid;
+ xlrec->minmax_only = minmax_only;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) xlrec, sizeof(pgssXLogReset));
+ XLogSetRecordFlags(XLOG_MARK_UNIMPORTANT);
+ ptr = XLogInsert(CUSTOMRMGR_ID, PGSS_XLOG_RESET);
+ XLogFlush(ptr);
+ pfree(xlrec);
+ }
+
stats_reset = GetCurrentTimestamp();
if (userid != 0 && dbid != 0 && queryid != UINT64CONST(0))
@@ -3010,3 +3013,236 @@ comp_location(const void *a, const void *b)
return pg_cmp_s32(l, r);
}
+
+static void
+rmgr_redo(XLogReaderState *record)
+{
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+ /*
+ * Because we did not restore records from storage,
+ * we also do not restore records from WAL.
+ */
+ if (!pgss_save)
+ return;
+
+ if (info == PGSS_XLOG_INSERT)
+ {
+ pgssXLogInsert *xlrec = (pgssXLogInsert *) XLogRecGetData(record);
+ pgssEntry *entry;
+
+ if (xlrec->header != PGSS_FILE_HEADER)
+ {
+ elog(WARNING, "Skip the inconsistent WAL record");
+ return;
+ }
+
+ /* Safety check... */
+ if (!pgss || !pgss_hash)
+ return;
+
+ LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
+
+ entry = (pgssEntry *) hash_search(pgss_hash, &xlrec->key, HASH_FIND, NULL);
+
+ /* Create new entry, if not present */
+ if (!entry)
+ {
+ Size query_offset;
+ bool stored;
+ char *query;
+
+ Assert(xlrec->query_len > 0);
+
+ query = (char *) xlrec->qtext;
+
+ /* Append new query text to file */
+ stored = qtext_store(query, xlrec->query_len, &query_offset, NULL);
+
+ /* If we failed to write to the text file, give up */
+ if (!stored)
+ {
+ LWLockRelease(pgss->lock);
+ return;
+ }
+
+ /* OK to create a new hashtable entry */
+ entry = entry_alloc(&xlrec->key, query_offset, xlrec->query_len,
+ xlrec->encoding, false);
+
+ /* If needed, perform garbage collection */
+ gc_qtexts();
+ }
+
+ /* Copy the necessary data from XLog record */
+ entry->counters = xlrec->counters;
+
+ entry->encoding = xlrec->encoding;
+ entry->stats_since = xlrec->stats_since;
+ entry->minmax_stats_since = xlrec->minmax_stats_since;
+
+ LWLockRelease(pgss->lock);
+ }
+ else if (info == PGSS_XLOG_RESET)
+ {
+ pgssXLogReset *xlrec = (pgssXLogReset *) XLogRecGetData(record);
+
+ if (xlrec->header != PGSS_FILE_HEADER)
+ {
+ elog(WARNING, "Skip the inconsistent WAL record");
+ return;
+ }
+
+ entry_reset(xlrec->userid, xlrec->dbid, xlrec->queryid, xlrec->minmax_only);
+ }
+}
+
+static void
+rmgr_desc(StringInfo buf, XLogReaderState *record)
+{
+ char *rec = XLogRecGetData(record);
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+ if (info == PGSS_XLOG_INSERT)
+ {
+ pgssXLogInsert *xlrec = (pgssXLogInsert *) rec;
+
+ if (xlrec->header != PGSS_FILE_HEADER)
+ {
+ elog(WARNING, "Skip the inconsistent WAL record");
+ return;
+ }
+
+ appendStringInfo(buf, "userid: %u, dbid: %u, queryid: " UINT64_FORMAT
+ ", toplevel: %d",
+ xlrec->key.userid, xlrec->key.dbid, xlrec->key.queryid,
+ (int) xlrec->key.toplevel);
+ }
+ else if (info == PGSS_XLOG_RESET)
+ {
+ pgssXLogReset *xlrec = (pgssXLogReset *) rec;
+
+ if (xlrec->header != PGSS_FILE_HEADER)
+ {
+ elog(WARNING, "Skip the inconsistent WAL record");
+ return;
+ }
+
+ appendStringInfo(buf, "userid: %u, dbid: %u, queryid: " UINT64_FORMAT
+ ", minmax_only: %d",
+ xlrec->userid, xlrec->dbid, xlrec->queryid,
+ (int) xlrec->minmax_only);
+ }
+}
+
+static const char *
+rmgr_identify(uint8 info)
+{
+ if ((info & ~XLR_INFO_MASK) == PGSS_XLOG_INSERT)
+ return "INSERT";
+ if ((info & ~XLR_INFO_MASK) == PGSS_XLOG_RESET)
+ return "RESET";
+
+ return NULL;
+}
+
+static void
+rmgr_checkpoint(int flags)
+{
+ FILE *file;
+ char *qbuffer = NULL;
+ Size qbuffer_size = 0;
+ HASH_SEQ_STATUS hash_seq;
+ int32 num_entries;
+ pgssEntry *entry;
+
+ /* Safety check ... shouldn't get here unless shmem is set up. */
+ if (!pgss || !pgss_hash)
+ return;
+
+ /* Don't dump if told not to. */
+ if (!pgss_save)
+ return;
+
+ /* XXX: Can there be concurrent CHECKPOINTs? */
+ LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
+
+ file = AllocateFile(PGSS_DUMP_FILE ".tmp", PG_BINARY_W);
+ if (file == NULL)
+ goto error;
+
+ if (fwrite(&PGSS_FILE_HEADER, sizeof(uint32), 1, file) != 1)
+ goto error;
+ if (fwrite(&PGSS_PG_MAJOR_VERSION, sizeof(uint32), 1, file) != 1)
+ goto error;
+ num_entries = hash_get_num_entries(pgss_hash);
+ if (fwrite(&num_entries, sizeof(int32), 1, file) != 1)
+ goto error;
+
+ qbuffer = qtext_load_file(&qbuffer_size);
+ if (qbuffer == NULL)
+ goto error;
+
+ /*
+ * When serializing to disk, we store query texts immediately after their
+ * entry data. Any orphaned query texts are thereby excluded.
+ */
+ hash_seq_init(&hash_seq, pgss_hash);
+ while ((entry = hash_seq_search(&hash_seq)) != NULL)
+ {
+ int len = entry->query_len;
+ char *qstr = qtext_fetch(entry->query_offset, len,
+ qbuffer, qbuffer_size);
+
+ if (qstr == NULL)
+ continue; /* Ignore any entries with bogus texts */
+
+ if (fwrite(entry, sizeof(pgssEntry), 1, file) != 1 ||
+ fwrite(qstr, 1, len + 1, file) != len + 1)
+ {
+ /* note: we assume hash_seq_term won't change errno */
+ hash_seq_term(&hash_seq);
+ goto error;
+ }
+ }
+
+ /* Dump global statistics for pg_stat_statements */
+ if (fwrite(&pgss->stats, sizeof(pgssGlobalStats), 1, file) != 1)
+ goto error;
+
+ free(qbuffer);
+ qbuffer = NULL;
+
+ if (FreeFile(file))
+ {
+ file = NULL;
+ goto error;
+ }
+
+ /*
+ * Rename file into place, so we atomically replace any old one.
+ */
+ (void) durable_rename(PGSS_DUMP_FILE ".tmp", PGSS_DUMP_FILE, LOG);
+
+ /* Unlink query-texts file; it's not needed while shutdown */
+ if (flags & CHECKPOINT_IS_SHUTDOWN)
+ unlink(PGSS_TEXT_FILE);
+
+ LWLockRelease(pgss->lock);
+ return;
+
+error:
+ ereport(LOG,
+ (errcode_for_file_access(),
+ errmsg("could not write file \"%s\": %m",
+ PGSS_DUMP_FILE ".tmp")));
+ free(qbuffer);
+ if (file)
+ FreeFile(file);
+ unlink(PGSS_DUMP_FILE ".tmp");
+
+ if (flags & CHECKPOINT_IS_SHUTDOWN)
+ unlink(PGSS_TEXT_FILE);
+
+ LWLockRelease(pgss->lock);
+}
diff --git a/contrib/pg_stat_statements/t/020_crash.pl b/contrib/pg_stat_statements/t/020_crash.pl
new file mode 100644
index 0000000000..f33bb43d7d
--- /dev/null
+++ b/contrib/pg_stat_statements/t/020_crash.pl
@@ -0,0 +1,80 @@
+# Copyright (c) 2023-2024, PostgreSQL Global Development Group
+
+# Tests for checking that pg_stat_statements contents are preserved
+# across restarts.
+
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $node = PostgreSQL::Test::Cluster->new('main');
+$node->init;
+$node->append_conf('postgresql.conf',
+ q[
+ shared_preload_libraries = 'pg_stat_statements'
+ restart_after_crash = 1
+ ]);
+$node->start;
+
+$node->safe_psql('postgres', 'CREATE EXTENSION pg_stat_statements');
+
+# Without the CHECKPOINT hook, we won't see this query in pg_stat_statements
+# after a server crash.
+$node->safe_psql('postgres', 'CREATE TABLE t1 (a int)');
+
+$node->safe_psql('postgres', 'CHECKPOINT');
+$node->safe_psql('postgres', 'SELECT a FROM t1');
+
+is( $node->safe_psql(
+ 'postgres',
+ "SELECT query FROM pg_stat_statements WHERE query NOT LIKE '%pg_stat_statements%' ORDER BY query"
+ ),
+ "CHECKPOINT\nCREATE TABLE t1 (a int)\nSELECT a FROM t1",
+ 'pg_stat_statements populated');
+
+
+# Perform a server shutdown by killing the backend.
+my $psql_timeout = IPC::Run::timer($PostgreSQL::Test::Utils::timeout_default);
+
+my ($killme_stdin, $killme_stdout, $killme_stderr) = ('', '', '');
+my $killme = IPC::Run::start(
+ [
+ 'psql', '-X', '-qAt', '-v', 'ON_ERROR_STOP=1', '-f', '-', '-d',
+ $node->connstr('postgres')
+ ],
+ '<',
+ \$killme_stdin,
+ '>',
+ \$killme_stdout,
+ '2>',
+ \$killme_stderr,
+ $psql_timeout);
+
+$killme_stdin .= "SELECT pg_backend_pid();\n";
+ok( pump_until(
+ $killme, $psql_timeout, \$killme_stdout, qr/[[:digit:]]+[\r\n]$/m),
+ 'acquired pid for SIGQUIT');
+my $pid = $killme_stdout;
+chomp($pid);
+
+my $ret = PostgreSQL::Test::Utils::system_log('pg_ctl', 'kill', 'QUIT', $pid);
+is($ret, 0, "killed process with SIGQUIT");
+
+$killme->finish;
+
+# Wait till server restarts
+is($node->poll_query_until('postgres', undef, ''),
+ "1", "reconnected after SIGQUIT");
+
+is( $node->safe_psql(
+ 'postgres',
+ "SELECT query FROM pg_stat_statements WHERE query NOT LIKE '%pg_stat_statements%' ORDER BY query"
+ ),
+ "CHECKPOINT\nCREATE TABLE t1 (a int)\nSELECT a FROM t1\nSELECT pg_backend_pid()",
+ 'pg_stat_statements data kept across the server crash');
+
+$node->stop;
+
+done_testing();