On Wed, Feb 14, 2024 at 6:59 AM Jeff Davis <pg...@j-davis.com> wrote:
>
> Attached 2 patches.
>
> Per Andres's suggestion, 0001 adds an:
>   Assert(startptr + count <= LogwrtResult.Write)
>
> Though if we want to allow the caller (e.g. in an extension) to
> determine the valid range, perhaps using WaitXLogInsertionsToFinish(),
> then the check is wrong.

Right.

> Maybe we should just get rid of that code
> entirely and trust the caller to request a reasonable range?

I'd suggest we strike a balance here - error out in assert builds if
startptr+count is past the current insert position and trust the
callers for production builds. It has a couple of advantages over
doing just Assert(startptr + count <= LogwrtResult.Write):
1) It allows the caller to read unflushed WAL directly from WAL
buffers, see the attached 0005 for an example.
2) All the existing callers where WALReadFromBuffers() is thought to
be used are ensuring WAL availability by reading upto the flush
position so no problem with it.

Also, a note before WALRead() stating the caller must request the WAL
at least that's written out (upto LogwrtResult.Write). I'm not so sure
about this, perhaps, we don't need this comment at all.

Here, I'm with v23 patch set:

0001 - Adds assertion in WALReadFromBuffers() to ensure the requested
WAL isn't beyond the current insert position.
0002 - Adds a new test module to demonstrate how one can use
WALReadFromBuffers() ensuring WaitXLogInsertionsToFinish() if need be.
0003 - Uses WALReadFromBuffers in more places like logical walsenders
and backends.
0004 - Removes zero-padding related stuff as discussed in
https://www.postgresql.org/message-id/CALj2ACWBRFac2TingD3PE3w2EBHXUHY3=aeezpjmqhpeobg...@mail.gmail.com.
This is needed in this patch set otherwise the assertion added in 0001
fails after 0003.
0005 - Adds a page_read callback for reading from WAL buffers in the
new test module added in 0002. Also, adds tests.

Thoughts?

--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
From 5c0f95acda494904d02593e6ba305717b61c44b5 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Fri, 16 Feb 2024 06:54:53 +0000
Subject: [PATCH v23 2/5] Add test module for verifying read from WAL buffers

---
 src/test/modules/Makefile                     |  1 +
 src/test/modules/meson.build                  |  1 +
 .../modules/read_wal_from_buffers/.gitignore  |  4 ++
 .../modules/read_wal_from_buffers/Makefile    | 23 ++++++
 .../modules/read_wal_from_buffers/meson.build | 33 +++++++++
 .../read_wal_from_buffers--1.0.sql            | 14 ++++
 .../read_wal_from_buffers.c                   | 54 ++++++++++++++
 .../read_wal_from_buffers.control             |  4 ++
 .../read_wal_from_buffers/t/001_basic.pl      | 72 +++++++++++++++++++
 9 files changed, 206 insertions(+)
 create mode 100644 src/test/modules/read_wal_from_buffers/.gitignore
 create mode 100644 src/test/modules/read_wal_from_buffers/Makefile
 create mode 100644 src/test/modules/read_wal_from_buffers/meson.build
 create mode 100644 src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql
 create mode 100644 src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c
 create mode 100644 src/test/modules/read_wal_from_buffers/read_wal_from_buffers.control
 create mode 100644 src/test/modules/read_wal_from_buffers/t/001_basic.pl

diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 89aa41b5e3..864a3dd72b 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -12,6 +12,7 @@ SUBDIRS = \
 		  dummy_seclabel \
 		  libpq_pipeline \
 		  plsample \
+		  read_wal_from_buffers \
 		  spgist_name_ops \
 		  test_bloomfilter \
 		  test_copy_callbacks \
diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build
index 8fbe742d38..4f3dd69e58 100644
--- a/src/test/modules/meson.build
+++ b/src/test/modules/meson.build
@@ -33,6 +33,7 @@ subdir('test_resowner')
 subdir('test_rls_hooks')
 subdir('test_shm_mq')
 subdir('test_slru')
+subdir('read_wal_from_buffers')
 subdir('unsafe_tests')
 subdir('worker_spi')
 subdir('xid_wraparound')
diff --git a/src/test/modules/read_wal_from_buffers/.gitignore b/src/test/modules/read_wal_from_buffers/.gitignore
new file mode 100644
index 0000000000..5dcb3ff972
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/.gitignore
@@ -0,0 +1,4 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
diff --git a/src/test/modules/read_wal_from_buffers/Makefile b/src/test/modules/read_wal_from_buffers/Makefile
new file mode 100644
index 0000000000..9e57a837f9
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/Makefile
@@ -0,0 +1,23 @@
+# src/test/modules/read_wal_from_buffers/Makefile
+
+MODULE_big = read_wal_from_buffers
+OBJS = \
+	$(WIN32RES) \
+	read_wal_from_buffers.o
+PGFILEDESC = "read_wal_from_buffers - test module to read WAL from WAL buffers"
+
+EXTENSION = read_wal_from_buffers
+DATA = read_wal_from_buffers--1.0.sql
+
+TAP_TESTS = 1
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/read_wal_from_buffers
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/read_wal_from_buffers/meson.build b/src/test/modules/read_wal_from_buffers/meson.build
new file mode 100644
index 0000000000..3fac00d616
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/meson.build
@@ -0,0 +1,33 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+read_wal_from_buffers_sources = files(
+  'read_wal_from_buffers.c',
+)
+
+if host_system == 'windows'
+  read_wal_from_buffers_sources += rc_lib_gen.process(win32ver_rc, extra_args: [
+    '--NAME', 'read_wal_from_buffers',
+    '--FILEDESC', 'read_wal_from_buffers - test module to read WAL from WAL buffers',])
+endif
+
+read_wal_from_buffers = shared_module('read_wal_from_buffers',
+  read_wal_from_buffers_sources,
+  kwargs: pg_test_mod_args,
+)
+test_install_libs += read_wal_from_buffers
+
+test_install_data += files(
+  'read_wal_from_buffers.control',
+  'read_wal_from_buffers--1.0.sql',
+)
+
+tests += {
+  'name': 'read_wal_from_buffers',
+  'sd': meson.current_source_dir(),
+  'bd': meson.current_build_dir(),
+  'tap': {
+    'tests': [
+      't/001_basic.pl',
+    ],
+  },
+}
diff --git a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql
new file mode 100644
index 0000000000..82fa097d10
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql
@@ -0,0 +1,14 @@
+/* src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION read_wal_from_buffers" to load this file. \quit
+
+--
+-- read_wal_from_buffers()
+--
+-- SQL function to read WAL from WAL buffers. Returns number of bytes read.
+--
+CREATE FUNCTION read_wal_from_buffers(IN lsn pg_lsn, IN bytes_to_read int,
+    bytes_read OUT int)
+AS 'MODULE_PATHNAME', 'read_wal_from_buffers'
+LANGUAGE C STRICT;
diff --git a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c
new file mode 100644
index 0000000000..9df5c07b4b
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c
@@ -0,0 +1,54 @@
+/*--------------------------------------------------------------------------
+ *
+ * read_wal_from_buffers.c
+ *		Test module to read WAL from WAL buffers.
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xlog.h"
+#include "fmgr.h"
+#include "utils/pg_lsn.h"
+
+PG_MODULE_MAGIC;
+
+/*
+ * SQL function to read WAL from WAL buffers. Returns number of bytes read.
+ */
+PG_FUNCTION_INFO_V1(read_wal_from_buffers);
+Datum
+read_wal_from_buffers(PG_FUNCTION_ARGS)
+{
+	XLogRecPtr	startptr = PG_GETARG_LSN(0);
+	int32		count = PG_GETARG_INT32(1);
+	Size		read;
+	char	   *data = palloc0(count);
+	XLogRecPtr	upto = startptr + count;
+	XLogRecPtr	insert_pos = GetXLogInsertRecPtr();
+	TimeLineID	tli = GetWALInsertionTimeLine();
+
+	/*
+	 * The requested WAL may be very recent, so wait for any in-progress WAL
+	 * insertions to WAL buffers to finish.
+	 */
+	if (upto > insert_pos)
+	{
+		XLogRecPtr	writtenUpto = WaitXLogInsertionsToFinish(upto);
+
+		upto = Min(upto, writtenUpto);
+		count = upto - startptr;
+	}
+
+	read = WALReadFromBuffers(data, startptr, count, tli);
+
+	pfree(data);
+
+	PG_RETURN_INT32(read);
+}
diff --git a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.control b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.control
new file mode 100644
index 0000000000..b14d24751c
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.control
@@ -0,0 +1,4 @@
+comment = 'Test module to read WAL from WAL buffers'
+default_version = '1.0'
+module_pathname = '$libdir/read_wal_from_buffers'
+relocatable = true
diff --git a/src/test/modules/read_wal_from_buffers/t/001_basic.pl b/src/test/modules/read_wal_from_buffers/t/001_basic.pl
new file mode 100644
index 0000000000..f985e49a27
--- /dev/null
+++ b/src/test/modules/read_wal_from_buffers/t/001_basic.pl
@@ -0,0 +1,72 @@
+# Copyright (c) 2021-2023, PostgreSQL Global Development Group
+
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+use Time::HiRes qw(usleep);
+
+# Setup a new node.  The configuration chosen here minimizes the number
+# of arbitrary records that could get generated in a cluster.  Enlarging
+# checkpoint_timeout avoids noise with checkpoint activity.  wal_level
+# set to "minimal" avoids random standby snapshot records.  Autovacuum
+# could also trigger randomly, generating random WAL activity of its own.
+# Enlarging wal_writer_delay and wal_writer_flush_after avoid background
+# wal flush by walwriter.
+my $node = PostgreSQL::Test::Cluster->new("node");
+$node->init;
+$node->append_conf(
+	'postgresql.conf',
+	q[wal_level = minimal
+	  autovacuum = off
+	  checkpoint_timeout = '30min'
+	  wal_writer_delay = 10000ms
+	  wal_writer_flush_after = 1GB
+]);
+$node->start;
+
+# Setup.
+$node->safe_psql('postgres', 'CREATE EXTENSION read_wal_from_buffers;');
+
+$node->safe_psql('postgres', 'CREATE TABLE t (c int);');
+
+my $result = 0;
+my $lsn;
+my $to_read;
+
+# Wait until we read from WAL buffers
+for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
+{
+	# Get current insert LSN. After this, we generate some WAL which is guranteed
+	# to be in WAL buffers as there is no other WAL generating activity is
+	# happening on the server. We then verify if we can read the WAL from WAL
+	# buffers using this LSN.
+	$lsn = $node->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+
+	my $logstart = -s $node->logfile;
+
+	# Generate minimal WAL so that WAL buffers don't get overwritten.
+	$node->safe_psql('postgres', "INSERT INTO t VALUES ($i);");
+
+	$to_read = 8192;
+
+	my $res = $node->safe_psql('postgres',
+				qq{SELECT read_wal_from_buffers(lsn := '$lsn', bytes_to_read := $to_read) > 0;});
+
+	my $log = $node->log_contains(
+				"request to flush past end of generated WAL; request .*, current position .*",
+				$logstart);
+
+	if ($res eq 't' && $log > 0)
+	{
+		$result = 1;
+		last;
+	}
+
+	usleep(100_000);
+}
+ok($result, 'waited until WAL is successfully read from WAL buffers');
+
+done_testing();
-- 
2.34.1

From c43e78ec2738c92bd7c73e9ec96ba72acc594bc1 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Fri, 16 Feb 2024 06:56:23 +0000
Subject: [PATCH v23 4/5] Do away with zero-padding assumption before WALRead

---
 src/backend/access/transam/xlogutils.c | 10 ++--------
 src/backend/postmaster/walsummarizer.c |  7 +------
 src/backend/replication/walsender.c    |  2 +-
 3 files changed, 4 insertions(+), 15 deletions(-)

diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index d4872ec170..8fb2e68e85 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -1010,19 +1010,13 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	}
 
 	/* attempt to read WAL from WAL buffers first */
-	nbytes = XLOG_BLCKSZ;
+	nbytes = count;
 	rbytes = WALReadFromBuffers(cur_page, targetPagePtr, nbytes, currTLI);
 	cur_page += rbytes;
 	targetPagePtr += rbytes;
 	nbytes -= rbytes;
 
-	/*
-	 * Now read the remaining WAL from WAL file.
-	 *
-	 * Even though we just determined how much of the page can be validly read
-	 * as 'count', read the whole page anyway. It's guaranteed to be
-	 * zero-padded up to the page boundary if it's incomplete.
-	 */
+	/* now read the remaining WAL from WAL file */
 	if (nbytes > 0 &&
 		!WALRead(state, cur_page, targetPagePtr, nbytes, tli,
 				 &errinfo))
diff --git a/src/backend/postmaster/walsummarizer.c b/src/backend/postmaster/walsummarizer.c
index 3e1b146538..e85d497034 100644
--- a/src/backend/postmaster/walsummarizer.c
+++ b/src/backend/postmaster/walsummarizer.c
@@ -1318,12 +1318,7 @@ summarizer_read_local_xlog_page(XLogReaderState *state,
 		}
 	}
 
-	/*
-	 * Even though we just determined how much of the page can be validly read
-	 * as 'count', read the whole page anyway. It's guaranteed to be
-	 * zero-padded up to the page boundary if it's incomplete.
-	 */
-	if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ,
+	if (!WALRead(state, cur_page, targetPagePtr, count,
 				 private_data->tli, &errinfo))
 		WALReadRaiseError(&errinfo);
 
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 24687dab28..7ecc7174a0 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1098,7 +1098,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 		count = flushptr - targetPagePtr;	/* part of the page available */
 
 	/* attempt to read WAL from WAL buffers first */
-	nbytes = XLOG_BLCKSZ;
+	nbytes = count;
 	rbytes = WALReadFromBuffers(cur_page, targetPagePtr, nbytes, currTLI);
 	cur_page += rbytes;
 	targetPagePtr += rbytes;
-- 
2.34.1

From ffa3b4e2bc95cdf69ac0feae0bf2ad268a9115b5 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Fri, 16 Feb 2024 06:55:53 +0000
Subject: [PATCH v23 3/5] Use WALReadFromBuffers in more places

---
 src/backend/access/transam/xlogutils.c | 14 +++++++++++++-
 src/backend/replication/walsender.c    | 16 +++++++++++++---
 2 files changed, 26 insertions(+), 4 deletions(-)

diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 945f1f790d..d4872ec170 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -895,6 +895,8 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	int			count;
 	WALReadError errinfo;
 	TimeLineID	currTLI;
+	Size		nbytes;
+	Size		rbytes;
 
 	loc = targetPagePtr + reqLen;
 
@@ -1007,12 +1009,22 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr,
 		count = read_upto - targetPagePtr;
 	}
 
+	/* attempt to read WAL from WAL buffers first */
+	nbytes = XLOG_BLCKSZ;
+	rbytes = WALReadFromBuffers(cur_page, targetPagePtr, nbytes, currTLI);
+	cur_page += rbytes;
+	targetPagePtr += rbytes;
+	nbytes -= rbytes;
+
 	/*
+	 * Now read the remaining WAL from WAL file.
+	 *
 	 * Even though we just determined how much of the page can be validly read
 	 * as 'count', read the whole page anyway. It's guaranteed to be
 	 * zero-padded up to the page boundary if it's incomplete.
 	 */
-	if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, tli,
+	if (nbytes > 0 &&
+		!WALRead(state, cur_page, targetPagePtr, nbytes, tli,
 				 &errinfo))
 		WALReadRaiseError(&errinfo);
 
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index e5477c1de1..24687dab28 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1059,6 +1059,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 	WALReadError errinfo;
 	XLogSegNo	segno;
 	TimeLineID	currTLI;
+	Size		nbytes;
+	Size		rbytes;
 
 	/*
 	 * Make sure we have enough WAL available before retrieving the current
@@ -1095,11 +1097,19 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 	else
 		count = flushptr - targetPagePtr;	/* part of the page available */
 
-	/* now actually read the data, we know it's there */
-	if (!WALRead(state,
+	/* attempt to read WAL from WAL buffers first */
+	nbytes = XLOG_BLCKSZ;
+	rbytes = WALReadFromBuffers(cur_page, targetPagePtr, nbytes, currTLI);
+	cur_page += rbytes;
+	targetPagePtr += rbytes;
+	nbytes -= rbytes;
+
+	/* now read the remaining WAL from WAL file */
+	if (nbytes > 0 &&
+		!WALRead(state,
 				 cur_page,
 				 targetPagePtr,
-				 XLOG_BLCKSZ,
+				 nbytes,
 				 currTLI,		/* Pass the current TLI because only
 								 * WalSndSegmentOpen controls whether new TLI
 								 * is needed. */
-- 
2.34.1

From 09e76fd9336352d6a34cc320c0b369e158b50b54 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Fri, 16 Feb 2024 06:54:10 +0000
Subject: [PATCH v23 1/5] Add check in WALReadFromBuffers against requested WAL

---
 src/backend/access/transam/xlog.c       | 36 ++++++++++++-------------
 src/backend/access/transam/xlogreader.c |  3 +++
 src/include/access/xlog.h               |  1 +
 3 files changed, 22 insertions(+), 18 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 4e14c242b1..884be9c805 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -698,7 +698,6 @@ static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos,
 									  XLogRecPtr *EndPos, XLogRecPtr *PrevPtr);
 static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos,
 							  XLogRecPtr *PrevPtr);
-static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto);
 static char *GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli);
 static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos);
 static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos);
@@ -1493,7 +1492,7 @@ WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt)
  * uninitialized page), and the inserter might need to evict an old WAL buffer
  * to make room for a new one, which in turn requires WALWriteLock.
  */
-static XLogRecPtr
+XLogRecPtr
 WaitXLogInsertionsToFinish(XLogRecPtr upto)
 {
 	uint64		bytepos;
@@ -1710,13 +1709,15 @@ GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli)
  * of bytes read successfully.
  *
  * Fewer than 'count' bytes may be read if some of the requested WAL data has
- * already been evicted from the WAL buffers, or if the caller requests data
- * that is not yet available.
+ * already been evicted from the WAL buffers.
  *
  * No locks are taken.
  *
  * The 'tli' argument is only used as a convenient safety check so that
  * callers do not read from WAL buffers on a historical timeline.
+ *
+ * Note: It is the caller's responsibility to ensure requested WAL up to
+ * 'startptr'+'count' is available by using WaitXLogInsertionsToFinish().
  */
 Size
 WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count,
@@ -1724,26 +1725,25 @@ WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count,
 {
 	char	   *pdst = dstbuf;
 	XLogRecPtr	recptr = startptr;
-	XLogRecPtr	upto;
-	Size		nbytes;
+	Size		nbytes = count;
 
 	if (RecoveryInProgress() || tli != GetWALInsertionTimeLine())
 		return 0;
 
 	Assert(!XLogRecPtrIsInvalid(startptr));
 
-	/*
-	 * Don't read past the available WAL data.
-	 *
-	 * Check using local copy of LogwrtResult. Ordinarily it's been updated by
-	 * the caller when determining how far to read; but if not, it just means
-	 * we'll read less data.
-	 *
-	 * XXX: the available WAL could be extended to the WAL insert pointer by
-	 * calling WaitXLogInsertionsToFinish().
-	 */
-	upto = Min(startptr + count, LogwrtResult.Write);
-	nbytes = upto - startptr;
+#ifdef USE_ASSERT_CHECKING
+	{
+		XLogRecPtr	upto = startptr + count;
+		XLogRecPtr	insert_pos = GetXLogInsertRecPtr();
+
+		if (upto > insert_pos)
+			ereport(ERROR,
+					(errmsg("cannot read past end of current insert position; request %X/%X, insert position %X/%X",
+							LSN_FORMAT_ARGS(upto),
+							LSN_FORMAT_ARGS(insert_pos))));
+	}
+#endif
 
 	/*
 	 * Loop through the buffers without a lock. For each buffer, atomically
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 74a6b11866..ae9904e7e4 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1500,6 +1500,9 @@ err:
  *
  * Returns true if succeeded, false if an error occurs, in which case
  * 'errinfo' receives error details.
+ *
+ * Note: It is the caller's responsibility to ensure requested WAL is written
+ * to disk, that is 'startptr'+'count' > LogwrtResult.Write.
  */
 bool
 WALRead(XLogReaderState *state,
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 76787a8267..74606a6846 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -252,6 +252,7 @@ extern XLogRecPtr GetLastImportantRecPtr(void);
 
 extern void SetWalWriterSleeping(bool sleeping);
 
+extern XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto);
 extern Size WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count,
 							   TimeLineID tli);
 
-- 
2.34.1

From fe60c94d7f1cd6b58bd0b2f6434e3d8365ebd0fc Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Fri, 16 Feb 2024 06:57:16 +0000
Subject: [PATCH v23 5/5] Demonstrate page_read callback for reading from WAL
 buffers

---
 src/backend/access/transam/xlogreader.c       |   3 +-
 .../read_wal_from_buffers--1.0.sql            |  23 ++
 .../read_wal_from_buffers.c                   | 266 +++++++++++++++++-
 .../read_wal_from_buffers/t/001_basic.pl      |  35 +++
 4 files changed, 325 insertions(+), 2 deletions(-)

diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index ae9904e7e4..4658a86997 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1035,7 +1035,8 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 	 * record is.  This is so that we can check the additional identification
 	 * info that is present in the first page's "long" header.
 	 */
-	if (targetSegNo != state->seg.ws_segno && targetPageOff != 0)
+	if (state->seg.ws_segno != 0 &&
+		targetSegNo != state->seg.ws_segno && targetPageOff != 0)
 	{
 		XLogRecPtr	targetSegmentPtr = pageptr - targetPageOff;
 
diff --git a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql
index 82fa097d10..72d05522fc 100644
--- a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql
+++ b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql
@@ -12,3 +12,26 @@ CREATE FUNCTION read_wal_from_buffers(IN lsn pg_lsn, IN bytes_to_read int,
     bytes_read OUT int)
 AS 'MODULE_PATHNAME', 'read_wal_from_buffers'
 LANGUAGE C STRICT;
+
+--
+-- get_wal_records_info_from_buffers()
+--
+-- SQL function to get info of WAL records available in WAL buffers.
+--
+CREATE FUNCTION get_wal_records_info_from_buffers(IN start_lsn pg_lsn,
+    IN end_lsn pg_lsn,
+    OUT start_lsn pg_lsn,
+    OUT end_lsn pg_lsn,
+    OUT prev_lsn pg_lsn,
+    OUT xid xid,
+    OUT resource_manager text,
+    OUT record_type text,
+    OUT record_length int4,
+    OUT main_data_length int4,
+    OUT fpi_length int4,
+    OUT description text,
+    OUT block_ref text
+)
+RETURNS SETOF record
+AS 'MODULE_PATHNAME', 'get_wal_records_info_from_buffers'
+LANGUAGE C STRICT PARALLEL SAFE;
diff --git a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c
index 9df5c07b4b..ed33a14127 100644
--- a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c
+++ b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c
@@ -14,11 +14,27 @@
 #include "postgres.h"
 
 #include "access/xlog.h"
-#include "fmgr.h"
+#include "access/xlog_internal.h"
+#include "access/xlogreader.h"
+#include "access/xlogrecovery.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "utils/builtins.h"
 #include "utils/pg_lsn.h"
 
 PG_MODULE_MAGIC;
 
+static int	read_from_wal_buffers(XLogReaderState *state, XLogRecPtr targetPagePtr,
+								  int reqLen, XLogRecPtr targetRecPtr,
+								  char *cur_page);
+
+static XLogRecord *ReadNextXLogRecord(XLogReaderState *xlogreader);
+static void GetWALRecordInfo(XLogReaderState *record, Datum *values,
+							 bool *nulls, uint32 ncols);
+static void GetWALRecordsInfo(FunctionCallInfo fcinfo,
+							  XLogRecPtr start_lsn,
+							  XLogRecPtr end_lsn);
+
 /*
  * SQL function to read WAL from WAL buffers. Returns number of bytes read.
  */
@@ -52,3 +68,251 @@ read_wal_from_buffers(PG_FUNCTION_ARGS)
 
 	PG_RETURN_INT32(read);
 }
+
+/*
+ * XLogReaderRoutine->page_read callback for reading WAL from WAL buffers.
+ */
+static int
+read_from_wal_buffers(XLogReaderState *state, XLogRecPtr targetPagePtr,
+					  int reqLen, XLogRecPtr targetRecPtr,
+					  char *cur_page)
+{
+	XLogRecPtr	read_upto,
+				loc;
+	TimeLineID	tli = GetWALInsertionTimeLine();
+	Size		count;
+	Size		read = 0;
+
+	loc = targetPagePtr + reqLen;
+
+	/* Loop waiting for xlog to be available if necessary */
+	while (1)
+	{
+		read_upto = GetXLogInsertRecPtr();
+
+		if (loc <= read_upto)
+			break;
+
+		WaitXLogInsertionsToFinish(loc);
+
+		CHECK_FOR_INTERRUPTS();
+		pg_usleep(1000L);
+	}
+
+	if (targetPagePtr + XLOG_BLCKSZ <= read_upto)
+	{
+		/*
+		 * more than one block available; read only that block, have caller
+		 * come back if they need more.
+		 */
+		count = XLOG_BLCKSZ;
+	}
+	else if (targetPagePtr + reqLen > read_upto)
+	{
+		/* not enough data there */
+		return -1;
+	}
+	else
+	{
+		/* enough bytes available to satisfy the request */
+		count = read_upto - targetPagePtr;
+	}
+
+	/* read WAL from WAL buffers */
+	read = WALReadFromBuffers(cur_page, targetPagePtr, count, tli);
+
+	if (read != count)
+		ereport(ERROR,
+				errmsg("could not read fully from WAL buffers; expected %lu, read %lu",
+					   count, read));
+
+	return count;
+}
+
+/*
+ * Get info of all WAL records between start LSN and end LSN.
+ *
+ * This function and its helpers below are similar to pg_walinspect's
+ * pg_get_wal_records_info() except that it will get info of WAL records
+ * available in WAL buffers.
+ */
+PG_FUNCTION_INFO_V1(get_wal_records_info_from_buffers);
+Datum
+get_wal_records_info_from_buffers(PG_FUNCTION_ARGS)
+{
+	XLogRecPtr	start_lsn = PG_GETARG_LSN(0);
+	XLogRecPtr	end_lsn = PG_GETARG_LSN(1);
+
+	/*
+	 * Validate start and end LSNs coming from the function inputs.
+	 *
+	 * Reading WAL below the first page of the first segments isn't allowed.
+	 * This is a bootstrap WAL page and the page_read callback fails to read
+	 * it.
+	 */
+	if (start_lsn < XLOG_BLCKSZ)
+		ereport(ERROR,
+				(errmsg("could not read WAL at LSN %X/%X",
+						LSN_FORMAT_ARGS(start_lsn))));
+
+	if (start_lsn > end_lsn)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("WAL start LSN must be less than end LSN")));
+
+	GetWALRecordsInfo(fcinfo, start_lsn, end_lsn);
+
+	PG_RETURN_VOID();
+}
+
+/*
+ * Read next WAL record.
+ */
+static XLogRecord *
+ReadNextXLogRecord(XLogReaderState *xlogreader)
+{
+	XLogRecord *record;
+	char	   *errormsg;
+
+	record = XLogReadRecord(xlogreader, &errormsg);
+
+	if (record == NULL)
+	{
+		if (errormsg)
+			ereport(ERROR,
+					errmsg("could not read WAL at %X/%X: %s",
+						   LSN_FORMAT_ARGS(xlogreader->EndRecPtr), errormsg));
+		else
+			ereport(ERROR,
+					errmsg("could not read WAL at %X/%X",
+						   LSN_FORMAT_ARGS(xlogreader->EndRecPtr)));
+	}
+
+	return record;
+}
+
+/*
+ * Output values that make up a row describing caller's WAL record.
+ */
+static void
+GetWALRecordInfo(XLogReaderState *record, Datum *values,
+				 bool *nulls, uint32 ncols)
+{
+	const char *record_type;
+	RmgrData	desc;
+	uint32		fpi_len = 0;
+	StringInfoData rec_desc;
+	StringInfoData rec_blk_ref;
+	int			i = 0;
+
+	desc = GetRmgr(XLogRecGetRmid(record));
+	record_type = desc.rm_identify(XLogRecGetInfo(record));
+
+	if (record_type == NULL)
+		record_type = psprintf("UNKNOWN (%x)", XLogRecGetInfo(record) & ~XLR_INFO_MASK);
+
+	initStringInfo(&rec_desc);
+	desc.rm_desc(&rec_desc, record);
+
+	if (XLogRecHasAnyBlockRefs(record))
+	{
+		initStringInfo(&rec_blk_ref);
+		XLogRecGetBlockRefInfo(record, false, true, &rec_blk_ref, &fpi_len);
+	}
+
+	values[i++] = LSNGetDatum(record->ReadRecPtr);
+	values[i++] = LSNGetDatum(record->EndRecPtr);
+	values[i++] = LSNGetDatum(XLogRecGetPrev(record));
+	values[i++] = TransactionIdGetDatum(XLogRecGetXid(record));
+	values[i++] = CStringGetTextDatum(desc.rm_name);
+	values[i++] = CStringGetTextDatum(record_type);
+	values[i++] = UInt32GetDatum(XLogRecGetTotalLen(record));
+	values[i++] = UInt32GetDatum(XLogRecGetDataLen(record));
+	values[i++] = UInt32GetDatum(fpi_len);
+
+	if (rec_desc.len > 0)
+		values[i++] = CStringGetTextDatum(rec_desc.data);
+	else
+		nulls[i++] = true;
+
+	if (XLogRecHasAnyBlockRefs(record))
+		values[i++] = CStringGetTextDatum(rec_blk_ref.data);
+	else
+		nulls[i++] = true;
+
+	Assert(i == ncols);
+}
+
+/*
+ * Get info of all WAL records between start LSN and end LSN.
+ */
+static void
+GetWALRecordsInfo(FunctionCallInfo fcinfo, XLogRecPtr start_lsn,
+				  XLogRecPtr end_lsn)
+{
+#define GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS 11
+	XLogReaderState *xlogreader;
+	XLogRecPtr	first_valid_record;
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	MemoryContext old_cxt;
+	MemoryContext tmp_cxt;
+
+	Assert(start_lsn <= end_lsn);
+
+	InitMaterializedSRF(fcinfo, 0);
+
+	xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
+									XL_ROUTINE(.page_read = &read_from_wal_buffers,
+											   .segment_open = NULL,
+											   .segment_close = NULL),
+									NULL);
+
+	if (xlogreader == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OUT_OF_MEMORY),
+				 errmsg("out of memory"),
+				 errdetail("Failed while allocating a WAL reading processor.")));
+
+	/* first find a valid recptr to start from */
+	first_valid_record = XLogFindNextRecord(xlogreader, start_lsn);
+
+	if (XLogRecPtrIsInvalid(first_valid_record))
+	{
+		ereport(LOG,
+				(errmsg("could not find a valid record after %X/%X",
+						LSN_FORMAT_ARGS(start_lsn))));
+
+		return;
+	}
+
+	tmp_cxt = AllocSetContextCreate(CurrentMemoryContext,
+									"GetWALRecordsInfo temporary cxt",
+									ALLOCSET_DEFAULT_SIZES);
+
+	while (ReadNextXLogRecord(xlogreader) &&
+		   xlogreader->EndRecPtr <= end_lsn)
+	{
+		Datum		values[GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS] = {0};
+		bool		nulls[GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS] = {0};
+
+		/* Use the tmp context so we can clean up after each tuple is done */
+		old_cxt = MemoryContextSwitchTo(tmp_cxt);
+
+		GetWALRecordInfo(xlogreader, values, nulls,
+						 GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS);
+
+		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
+							 values, nulls);
+
+		/* clean up and switch back */
+		MemoryContextSwitchTo(old_cxt);
+		MemoryContextReset(tmp_cxt);
+
+		CHECK_FOR_INTERRUPTS();
+	}
+
+	MemoryContextDelete(tmp_cxt);
+	XLogReaderFree(xlogreader);
+
+#undef GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS
+}
diff --git a/src/test/modules/read_wal_from_buffers/t/001_basic.pl b/src/test/modules/read_wal_from_buffers/t/001_basic.pl
index f985e49a27..fcdcdb001e 100644
--- a/src/test/modules/read_wal_from_buffers/t/001_basic.pl
+++ b/src/test/modules/read_wal_from_buffers/t/001_basic.pl
@@ -69,4 +69,39 @@ for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
 }
 ok($result, 'waited until WAL is successfully read from WAL buffers');
 
+$result = 0;
+
+# Wait until we get info of WAL records available in WAL buffers.
+for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
+{
+	$node->safe_psql('postgres', "DROP TABLE IF EXISTS foo, bar;");
+	$node->safe_psql('postgres',
+		"CREATE TABLE foo AS SELECT * FROM generate_series(1, 2);");
+	my $start_lsn = $node->safe_psql('postgres',
+		"SELECT pg_current_wal_insert_lsn();");
+	my $tbl_oid = $node->safe_psql('postgres',
+		"SELECT oid FROM pg_class WHERE relname = 'foo';");
+	$node->safe_psql('postgres',
+		"INSERT INTO foo SELECT * FROM generate_series(1, 10);");
+	my $end_lsn = $node->safe_psql('postgres',
+		"SELECT pg_current_wal_insert_lsn();");
+	$node->safe_psql('postgres',
+		"CREATE TABLE bar AS SELECT * FROM generate_series(1, 2);");
+
+	my $res = $node->safe_psql('postgres',
+				"SELECT count(*) FROM get_wal_records_info_from_buffers('$start_lsn', '$end_lsn')
+					WHERE block_ref LIKE concat('%', '$tbl_oid', '%') AND
+						resource_manager = 'Heap' AND
+						record_type = 'INSERT';");
+
+	if ($res eq 10)
+	{
+		$result = 1;
+		last;
+	}
+
+	usleep(100_000);
+}
+ok($result, 'waited until we get info of WAL records available in WAL buffers.');
+
 done_testing();
-- 
2.34.1

Reply via email to