From 818df7e5f7350bbac65f08a186b3bd90e2c343e8 Mon Sep 17 00:00:00 2001
From: Peter Geoghegan <pg@bowt.ie>
Date: Mon, 27 Mar 2023 19:09:05 -0700
Subject: [PATCH v7 1/2] Emit WAL record info via pg_get_wal_block_info

---
 .../pg_walinspect/expected/pg_walinspect.out  |   4 +-
 .../pg_walinspect/pg_walinspect--1.0--1.1.sql |  20 +-
 contrib/pg_walinspect/pg_walinspect.c         | 172 +++++++++++-------
 contrib/pg_walinspect/sql/pg_walinspect.sql   |   4 +-
 doc/src/sgml/pgwalinspect.sgml                |  99 ++++++----
 5 files changed, 185 insertions(+), 114 deletions(-)

diff --git a/contrib/pg_walinspect/expected/pg_walinspect.out b/contrib/pg_walinspect/expected/pg_walinspect.out
index b9e2bb994..a481deaac 100644
--- a/contrib/pg_walinspect/expected/pg_walinspect.out
+++ b/contrib/pg_walinspect/expected/pg_walinspect.out
@@ -121,7 +121,7 @@ UPDATE sample_tbl SET col1 = col1 + 1 WHERE col1 = 1;
 SELECT pg_current_wal_lsn() AS wal_lsn4 \gset
 -- Check if we get block data from WAL record.
 SELECT COUNT(*) >= 1 AS ok FROM pg_get_wal_block_info(:'wal_lsn3', :'wal_lsn4')
-  WHERE relfilenode = :'sample_tbl_oid' AND blockdata IS NOT NULL;
+  WHERE relfilenode = :'sample_tbl_oid' AND block_data IS NOT NULL;
  ok 
 ----
  t
@@ -134,7 +134,7 @@ UPDATE sample_tbl SET col1 = col1 + 1 WHERE col1 = 2;
 SELECT pg_current_wal_lsn() AS wal_lsn6 \gset
 -- Check if we get FPI from WAL record.
 SELECT COUNT(*) >= 1 AS ok FROM pg_get_wal_block_info(:'wal_lsn5', :'wal_lsn6')
-  WHERE relfilenode = :'sample_tbl_oid' AND fpi IS NOT NULL;
+  WHERE relfilenode = :'sample_tbl_oid' AND fpi_data IS NOT NULL;
  ok 
 ----
  t
diff --git a/contrib/pg_walinspect/pg_walinspect--1.0--1.1.sql b/contrib/pg_walinspect/pg_walinspect--1.0--1.1.sql
index 586c3b446..4ff82c373 100644
--- a/contrib/pg_walinspect/pg_walinspect--1.0--1.1.sql
+++ b/contrib/pg_walinspect/pg_walinspect--1.0--1.1.sql
@@ -12,17 +12,25 @@ DROP FUNCTION pg_get_wal_stats_till_end_of_wal(pg_lsn, boolean);
 --
 CREATE FUNCTION pg_get_wal_block_info(IN start_lsn pg_lsn,
 	IN end_lsn pg_lsn,
-	OUT lsn pg_lsn,
+	OUT start_lsn pg_lsn,
+	OUT end_lsn pg_lsn,
+	OUT prev_lsn pg_lsn,
 	OUT blockid int2,
 	OUT reltablespace oid,
 	OUT reldatabase oid,
 	OUT relfilenode oid,
+	OUT relforknumber int2,
 	OUT relblocknumber int8,
-	OUT forkname text,
-	OUT blockdata bytea,
-	OUT fpi bytea,
-	OUT fpilen int4,
-	OUT fpiinfo text[]
+	OUT xid xid,
+	OUT resource_manager text,
+	OUT record_type text,
+	OUT record_length int4,
+	OUT main_data_length int4,
+	OUT block_fpi_length int4,
+	OUT block_fpi_info text[],
+	OUT description text,
+	OUT block_data bytea,
+	OUT fpi_data bytea
 )
 RETURNS SETOF record
 AS 'MODULE_PATHNAME', 'pg_get_wal_block_info'
diff --git a/contrib/pg_walinspect/pg_walinspect.c b/contrib/pg_walinspect/pg_walinspect.c
index 293373412..834304adc 100644
--- a/contrib/pg_walinspect/pg_walinspect.c
+++ b/contrib/pg_walinspect/pg_walinspect.c
@@ -177,6 +177,10 @@ ReadNextXLogRecord(XLogReaderState *xlogreader)
 
 /*
  * Get a single WAL record info.
+ *
+ * Note that the per-record information fetching code is same in both
+ * GetWALBlockInfo() and GetWALRecordInfo(), it's only the output columns order
+ * that differs. Try to keep this code in sync.
  */
 static void
 GetWALRecordInfo(XLogReaderState *record, Datum *values,
@@ -230,24 +234,47 @@ GetWALRecordInfo(XLogReaderState *record, Datum *values,
 
 
 /*
- * Store a set of block information from a single record (FPI and block
- * information).
+ * Get a single WAL record info along with its block information (block data
+ * and FPI).
+ *
+ * Note that the per-record information fetching code is same in both
+ * GetWALBlockInfo() and GetWALRecordInfo(), it's only the output columns order
+ * that differs. Try to keep this code in sync.
  */
 static void
 GetWALBlockInfo(FunctionCallInfo fcinfo, XLogReaderState *record)
 {
-#define PG_GET_WAL_BLOCK_INFO_COLS 11
+#define PG_GET_WAL_BLOCK_INFO_COLS 19
 	int			block_id;
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	RmgrData	desc;
+	const char	*record_type;
+	StringInfoData	rec_desc;
+
+	Assert(XLogRecHasAnyBlockRefs(record));
+
+	desc = GetRmgr(XLogRecGetRmid(record));
+	record_type = desc.rm_identify(XLogRecGetInfo(record));
+
+	if (record_type == NULL)
+		record_type = psprintf("UNKNOWN (%x)",
+							   XLogRecGetInfo(record) & ~XLR_INFO_MASK);
+
+	initStringInfo(&rec_desc);
+	desc.rm_desc(&rec_desc, record);
 
 	for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
 	{
 		DecodedBkpBlock *blk;
 		BlockNumber blkno;
 		RelFileLocator rnode;
-		ForkNumber	fork;
+		ForkNumber	forknum;
 		Datum		values[PG_GET_WAL_BLOCK_INFO_COLS] = {0};
 		bool		nulls[PG_GET_WAL_BLOCK_INFO_COLS] = {0};
+		uint32		block_fpi_len = 0;
+		PGAlignedBlock buf;
+		Page		page;
+		ArrayType  *block_fpi_info = NULL;
 		int			i = 0;
 
 		if (!XLogRecHasBlockRef(record, block_id))
@@ -256,70 +283,22 @@ GetWALBlockInfo(FunctionCallInfo fcinfo, XLogReaderState *record)
 		blk = XLogRecGetBlock(record, block_id);
 
 		(void) XLogRecGetBlockTagExtended(record, block_id,
-										  &rnode, &fork, &blkno, NULL);
-
-		values[i++] = LSNGetDatum(record->ReadRecPtr);
-		values[i++] = Int16GetDatum(block_id);
-		values[i++] = ObjectIdGetDatum(blk->rlocator.spcOid);
-		values[i++] = ObjectIdGetDatum(blk->rlocator.dbOid);
-		values[i++] = ObjectIdGetDatum(blk->rlocator.relNumber);
-		values[i++] = Int64GetDatum((int64) blkno);
-
-		if (fork >= 0 && fork <= MAX_FORKNUM)
-			values[i++] = CStringGetTextDatum(forkNames[fork]);
-		else
-			ereport(ERROR,
-					(errcode(ERRCODE_INTERNAL_ERROR),
-					 errmsg_internal("invalid fork number: %u", fork)));
-
-		/* Block data */
-		if (blk->has_data)
-		{
-			bytea	   *raw_data;
-
-			/* Initialize bytea buffer to copy the data to */
-			raw_data = (bytea *) palloc(blk->data_len + VARHDRSZ);
-			SET_VARSIZE(raw_data, blk->data_len + VARHDRSZ);
-
-			/* Copy the data */
-			memcpy(VARDATA(raw_data), blk->data, blk->data_len);
-			values[i++] = PointerGetDatum(raw_data);
-		}
-		else
-		{
-			/* No data, so set this field to NULL */
-			nulls[i++] = true;
-		}
+										  &rnode, &forknum, &blkno, NULL);
 
 		if (blk->has_image)
 		{
-			PGAlignedBlock buf;
-			Page		page;
-			bytea	   *raw_page;
 			int			bitcnt;
 			int			cnt = 0;
 			Datum	   *flags;
-			ArrayType  *a;
-
-			page = (Page) buf.data;
 
 			/* Full page image exists, so let's save it */
+			page = (Page) buf.data;
 			if (!RestoreBlockImage(record, block_id, page))
 				ereport(ERROR,
 						(errcode(ERRCODE_INTERNAL_ERROR),
 						 errmsg_internal("%s", record->errormsg_buf)));
-
-			/* Initialize bytea buffer to copy the FPI to */
-			raw_page = (bytea *) palloc(BLCKSZ + VARHDRSZ);
-			SET_VARSIZE(raw_page, BLCKSZ + VARHDRSZ);
-
-			/* Take a verbatim copy of the FPI */
-			memcpy(VARDATA(raw_page), page, BLCKSZ);
-
-			values[i++] = PointerGetDatum(raw_page);
-			values[i++] = UInt32GetDatum(blk->bimg_len);
-
-			/* FPI flags */
+			/* Save block_fpi_len and block_fpi_info */
+			block_fpi_len = blk->bimg_len;
 			bitcnt = pg_popcount((const char *) &blk->bimg_info,
 								 sizeof(uint8));
 			/* Build set of raw flags */
@@ -337,15 +316,71 @@ GetWALBlockInfo(FunctionCallInfo fcinfo, XLogReaderState *record)
 				flags[cnt++] = CStringGetTextDatum("COMPRESS_ZSTD");
 
 			Assert(cnt <= bitcnt);
-			a = construct_array_builtin(flags, cnt, TEXTOID);
-			values[i++] = PointerGetDatum(a);
+			block_fpi_info = construct_array_builtin(flags, cnt, TEXTOID);
+		}
+
+		values[i++] = LSNGetDatum(record->ReadRecPtr);
+		values[i++] = LSNGetDatum(record->EndRecPtr);
+		values[i++] = LSNGetDatum(XLogRecGetPrev(record));
+		values[i++] = Int16GetDatum(block_id);
+		values[i++] = ObjectIdGetDatum(blk->rlocator.spcOid);
+		values[i++] = ObjectIdGetDatum(blk->rlocator.dbOid);
+		values[i++] = ObjectIdGetDatum(blk->rlocator.relNumber);
+		values[i++] = Int16GetDatum(forknum);
+		values[i++] = Int64GetDatum((int64) blkno);
+		values[i++] = TransactionIdGetDatum(XLogRecGetXid(record));
+		values[i++] = CStringGetTextDatum(desc.rm_name);
+		values[i++] = CStringGetTextDatum(record_type);
+
+		/* record_length, main_data_length, block_fpi_length */
+		values[i++] = UInt32GetDatum(XLogRecGetTotalLen(record));
+		values[i++] = UInt32GetDatum(XLogRecGetDataLen(record));
+		values[i++] = UInt32GetDatum(block_fpi_len);
+
+		/* block_fpi_info status flags */
+		if (block_fpi_info)
+			values[i++] = PointerGetDatum(block_fpi_info);
+		else
+			nulls[i++] = true;
+
+		/* description */
+		if (rec_desc.len > 0)
+			values[i++] = CStringGetTextDatum(rec_desc.data);
+		else
+			nulls[i++] = true;
+
+		/* block_data output parameter */
+		if (blk->has_data)
+		{
+			bytea	   *raw_data;
+
+			/* Initialize bytea buffer to copy the data to */
+			raw_data = (bytea *) palloc(blk->data_len + VARHDRSZ);
+			SET_VARSIZE(raw_data, blk->data_len + VARHDRSZ);
+
+			/* Copy the data */
+			memcpy(VARDATA(raw_data), blk->data, blk->data_len);
+			values[i++] = PointerGetDatum(raw_data);
 		}
 		else
+			nulls[i++] = true;
+
+		/* fpi_data output parameter */
+		if (blk->has_image)
 		{
-			/* No full page image, so store NULLs for all its fields */
-			memset(&nulls[i], true, 3 * sizeof(bool));
-			i += 3;
+			bytea	   *raw_page;
+
+			/* Initialize bytea buffer to copy the FPI to */
+			raw_page = (bytea *) palloc(BLCKSZ + VARHDRSZ);
+			SET_VARSIZE(raw_page, BLCKSZ + VARHDRSZ);
+
+			/* Take a verbatim copy of the FPI */
+			memcpy(VARDATA(raw_page), page, BLCKSZ);
+
+			values[i++] = PointerGetDatum(raw_page);
 		}
+		else
+			nulls[i++] = true;
 
 		Assert(i == PG_GET_WAL_BLOCK_INFO_COLS);
 
@@ -357,11 +392,12 @@ GetWALBlockInfo(FunctionCallInfo fcinfo, XLogReaderState *record)
 }
 
 /*
- * Get information about all the blocks saved in WAL records between start
- * and end LSNs. This produces information about the full page images with
- * their relation information, and the data saved in each block associated
- * to a record. Decompression is applied to the full page images, if
- * necessary.
+ * Get info of all WAL records having block references between start LSN and
+ * end LSN. It outputs one row for each WAL record's block reference (note that
+ * a single WAL record can have one or more blocks associated with it). It
+ * outputs WAL record information as well as block information (block data
+ * and/or FPI). Decompression is applied to the full page images, if necessary.
+ * It omits WAL records that contain no block references.
  */
 Datum
 pg_get_wal_block_info(PG_FUNCTION_ARGS)
@@ -485,7 +521,7 @@ ValidateInputLSNs(XLogRecPtr start_lsn, XLogRecPtr *end_lsn)
 }
 
 /*
- * Get info and data of all WAL records between start LSN and end LSN.
+ * Get info of all WAL records between start LSN and end LSN.
  */
 static void
 GetWALRecordsInfo(FunctionCallInfo fcinfo, XLogRecPtr start_lsn,
@@ -537,7 +573,7 @@ GetWALRecordsInfo(FunctionCallInfo fcinfo, XLogRecPtr start_lsn,
 }
 
 /*
- * Get info and data of all WAL records between start LSN and end LSN.
+ * Get info of all WAL records between start LSN and end LSN.
  */
 Datum
 pg_get_wal_records_info(PG_FUNCTION_ARGS)
diff --git a/contrib/pg_walinspect/sql/pg_walinspect.sql b/contrib/pg_walinspect/sql/pg_walinspect.sql
index 2f0a909c1..34e83f529 100644
--- a/contrib/pg_walinspect/sql/pg_walinspect.sql
+++ b/contrib/pg_walinspect/sql/pg_walinspect.sql
@@ -78,7 +78,7 @@ UPDATE sample_tbl SET col1 = col1 + 1 WHERE col1 = 1;
 SELECT pg_current_wal_lsn() AS wal_lsn4 \gset
 -- Check if we get block data from WAL record.
 SELECT COUNT(*) >= 1 AS ok FROM pg_get_wal_block_info(:'wal_lsn3', :'wal_lsn4')
-  WHERE relfilenode = :'sample_tbl_oid' AND blockdata IS NOT NULL;
+  WHERE relfilenode = :'sample_tbl_oid' AND block_data IS NOT NULL;
 
 -- Force full-page image on the next update.
 SELECT pg_current_wal_lsn() AS wal_lsn5 \gset
@@ -87,7 +87,7 @@ UPDATE sample_tbl SET col1 = col1 + 1 WHERE col1 = 2;
 SELECT pg_current_wal_lsn() AS wal_lsn6 \gset
 -- Check if we get FPI from WAL record.
 SELECT COUNT(*) >= 1 AS ok FROM pg_get_wal_block_info(:'wal_lsn5', :'wal_lsn6')
-  WHERE relfilenode = :'sample_tbl_oid' AND fpi IS NOT NULL;
+  WHERE relfilenode = :'sample_tbl_oid' AND fpi_data IS NOT NULL;
 
 -- ===================================================================
 -- Tests for permissions
diff --git a/doc/src/sgml/pgwalinspect.sgml b/doc/src/sgml/pgwalinspect.sgml
index 9a0241a8d..ffcb39bb2 100644
--- a/doc/src/sgml/pgwalinspect.sgml
+++ b/doc/src/sgml/pgwalinspect.sgml
@@ -94,9 +94,10 @@ block_ref        | blkref #0: rel 1663/5/60221 fork main blk 2
      <para>
       Gets information of all the valid WAL records between
       <replaceable>start_lsn</replaceable> and <replaceable>end_lsn</replaceable>.
-      Returns one row per WAL record. If a future
-      <replaceable>end_lsn</replaceable> (i.e. ahead of the current LSN of
-      the server) is specified, it returns information until the end of WAL.
+      Returns one row per WAL record. If <replaceable>end_lsn</replaceable>
+      specified is in future (i.e. ahead of the current LSN of the server) or
+      <literal>FFFFFFFF/FFFFFFFF</literal> (i.e. highest possible value for
+      <type>pg_lsn</type> type), it returns information until the end of WAL.
       The function raises an error if <replaceable>start_lsn</replaceable>
       is not available. For example, usage of the function is as follows:
 <screen>
@@ -133,11 +134,13 @@ block_ref        |
       <replaceable>end_lsn</replaceable>. By default, it returns one row per
       <replaceable>resource_manager</replaceable> type. When
       <replaceable>per_record</replaceable> is set to <literal>true</literal>,
-      it returns one row per <replaceable>record_type</replaceable>. If a
-      future <replaceable>end_lsn</replaceable> (i.e. ahead of the current
-      LSN of the server) is specified, it returns statistics until the end
-      of WAL. An error is raised if <replaceable>start_lsn</replaceable> is
-      not available. For example, usage of the function is as follows:
+      it returns one row per <replaceable>record_type</replaceable>.
+      If <replaceable>end_lsn</replaceable> specified is in future (i.e. ahead
+      of the current LSN of the server) or <literal>FFFFFFFF/FFFFFFFF</literal>
+      (i.e. highest possible value for <type>pg_lsn</type> type), it returns
+      statistics until the end of WAL. An error is raised if
+      <replaceable>start_lsn</replaceable> is not available. For example, usage
+      of the function is as follows:
 <screen>
 postgres=# SELECT * FROM pg_get_wal_stats('0/1E847D00', '0/1E84F500')
              WHERE count > 0 LIMIT 1 AND
@@ -164,38 +167,62 @@ combined_size_percentage     | 2.8634072910530795
 
     <listitem>
      <para>
-      Gets a copy of the block information stored in WAL records. This includes
-      copies of the block data (<literal>NULL</literal> if none) and full page
-      images as <type>bytea</type> values (after
-      applying decompression when necessary, or <literal>NULL</literal> if none)
-      and their information associated with all the valid WAL records between
-      <replaceable>start_lsn</replaceable> and
-      <replaceable>end_lsn</replaceable>. Returns one row per block registered
-      in a WAL record. If a future <replaceable>end_lsn</replaceable> (i.e.
-      ahead of the current LSN of the server) is specified, it returns
-      statistics until the end of WAL. An error is raised if
-      <replaceable>start_lsn</replaceable> is not available. For example,
-      usage of the function is as follows:
+      Gets both WAL record and block information of all the valid WAL records
+      between <replaceable>start_lsn</replaceable> and
+      <replaceable>end_lsn</replaceable>. It omits WAL records that contain no
+      block references. The block information includes copies of the block data
+      (<literal>NULL</literal> if none) and full page images as
+      <type>bytea</type> values (after applying decompression when necessary,
+      or <literal>NULL</literal> if none). Returns one row per block registered
+      in a WAL record. If <replaceable>end_lsn</replaceable> specified is in
+      future (i.e. ahead of the current LSN of the server) or
+      <literal>FFFFFFFF/FFFFFFFF</literal> (i.e. highest possible value for
+      <type>pg_lsn</type> type), it returns information until the end of WAL.
+      An error is raised if <replaceable>start_lsn</replaceable> is not
+      available. For example, usage of the function is as follows:
 <screen>
-postgres=# SELECT lsn, blockid, reltablespace, reldatabase, relfilenode,
-                  relblocknumber, forkname,
-                  substring(blockdata for 24) as block_trimmed,
+postgres=# SELECT start_lsn, end_lsn, prev_lsn, reltablespace,
+                  reldatabase, relfilenode, relblocknumber,
+                  blockid, xid, resource_manager, record_type,
+                  record_length, main_data_length, description,
+                  forkname, substring(blockdata for 24) as block_trimmed,
                   substring(fpi for 24) as fpi_trimmed, fpilen, fpiinfo
-             FROM pg_get_wal_block_info('0/1871080', '0/1871440');
--[ RECORD 1 ]--+---------------------------------------------------
-lsn            | 0/18712F8
-blockid        | 0
-reltablespace  | 1663
-reldatabase    | 16384
-relfilenode    | 16392
-relblocknumber | 0
-forkname       | main
-block_trimmed  | \x02800128180164000000
-fpi_trimmed    | \x0000000050108701000000002c00601f00200420e0020000
-fpilen         | 204
-fpiinfo        | {HAS_HOLE,APPLY}
+             FROM pg_get_wal_block_info('0/14BF938', '0/14BFBA0');
+-[ RECORD 1 ]----+---------------------------------------------------
+start_lsn        | 0/14BFA20
+end_lsn          | 0/14BFB38
+prev_lsn         | 0/14BF9A8
+reltablespace    | 1663
+reldatabase      | 5
+relfilenode      | 16391
+relblocknumber   | 9
+blockid          | 0
+xid              | 735
+resource_manager | Heap
+record_type      | HOT_UPDATE
+record_length    | 279
+main_data_length | 14
+description      | off 1 xmax 735 flags 0x10 ; new off 5 xmax 0
+forkname         | main
+block_trimmed    | \x02800128180102000000
+fpi_trimmed      | \x00000000d0f84b01000000002c00601f00200420df020000
+fpilen           | 204
+fpiinfo          | {HAS_HOLE,APPLY}
 </screen>
      </para>
+     <note>
+      <para>
+       Note that although <function>pg_get_wal_records_info</function>
+       and <function>pg_get_wal_block_info</function> functions return
+       information of WAL records in common, the
+       <function>pg_get_wal_records_info</function> outputs block information
+       (without block data and FPI data) as a text column
+       (<literal>NULL</literal> if none), and
+       <function>pg_get_wal_block_info</function> outputs one row for each block
+       (including block data and FPI data) associated with each WAL record, omitting
+       WAL records that contain no block references.
+      </para>
+     </note>
     </listitem>
    </varlistentry>
 
-- 
2.39.2

