On 2020-Apr-22, Andres Freund wrote:

> I'm in favor of doing so. Not necessarily primarily to avoid repeated
> API changes, but because I don't think the v13 changes went in the quite
> right direction.
> 
> ISTM that we should:
> - have the three callbacks you mention above
> - change WALSegmentOpen to also get the XLogReaderState
> - add private state to WALOpenSegment, so it can be used even when not
>   accessing data in files / when one needs more information to close the
>   file.
> - disambiguate between WALOpenSegment (struct describing an open
>   segment) and WALSegmentOpen (callback to open a segment) (note that
>   the read page callback uses a *CB naming, why not follow?)

Here's a first attempt at that.  The segment_open/close callbacks are
now given at XLogReaderAllocate time, and are passed the XLogReaderState
pointer.  I wrote a comment to explain that the page_read callback can
use WALRead() if it wishes to do so; but if it does, then segment_open
has to be provided.  segment_close is mandatory (since we call it at
XLogReaderFree).

Of the half a dozen cases that exist, three are slightly weird:

* Physical walsender does not use a xlogreader at all.  I think we could
  beat that code up so that it does.  But for the moment I just cons up
  a fake xlogreader, which only has the segment_open pointer set up, so
  that it can call WALRead.

* main xlog.c uses an xlogreader with XLogPageRead(), which does not use
  WALRead.  Therefore it does not pass open_segment.  It does not use
  xlogreader->seg.ws_file either.  Eventually we may want to beat this
  one up also.

* pg_rewind has its own page read callback, SimpleXLogPageRead, which
  does all the required opening and closing.  I don't think it'd be an
  improvement to force this to use segment_open.  Oddly enough, it calls
  itself "simple" but is unique in having the ability to read files from
  the wal archive.

All tests are passing for me.

-- 
Álvaro Herrera                https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
>From 3b779f1ea5e6cd4861037cf37a8d9375eb08fcf1 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Thu, 23 Apr 2020 18:02:20 -0400
Subject: [PATCH] Rework XLogReader callback system

segment_open and segment_close are passed in at XLogReaderAllocate time,
together with page_read; add comment to explain the role of WALRead.
---
 src/backend/access/transam/twophase.c         |   5 +-
 src/backend/access/transam/xlog.c             |  10 +-
 src/backend/access/transam/xlogreader.c       |  49 ++++----
 src/backend/access/transam/xlogutils.c        |  24 ++--
 src/backend/replication/logical/logical.c     |  20 +--
 .../replication/logical/logicalfuncs.c        |   4 +-
 src/backend/replication/slotfuncs.c           |  10 +-
 src/backend/replication/walsender.c           |  37 ++++--
 src/bin/pg_rewind/parsexlog.c                 |   9 +-
 src/bin/pg_waldump/pg_waldump.c               |  30 +++--
 src/include/access/xlogreader.h               | 114 +++++++++++-------
 src/include/access/xlogutils.h                |   5 +
 src/include/replication/logical.h             |   4 +-
 13 files changed, 208 insertions(+), 113 deletions(-)

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 2f7d4ed59a..e1904877fa 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1331,7 +1331,10 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
 	char	   *errormsg;
 
 	xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
-									&read_local_xlog_page, NULL);
+									XL_ROUTINE(.page_read = &read_local_xlog_page,
+											   .segment_open = &wal_segment_open,
+											   .segment_close = &wal_segment_close),
+									NULL);
 	if (!xlogreader)
 		ereport(ERROR,
 				(errcode(ERRCODE_OUT_OF_MEMORY),
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 11e32733c4..797b9c490e 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -1222,7 +1222,7 @@ XLogInsertRecord(XLogRecData *rdata,
 
 		if (!debug_reader)
 			debug_reader = XLogReaderAllocate(wal_segment_size, NULL,
-											  NULL, NULL);
+											  XL_ROUTINE(), NULL);
 
 		if (!debug_reader)
 		{
@@ -6467,8 +6467,12 @@ StartupXLOG(void)
 
 	/* Set up XLOG reader facility */
 	MemSet(&private, 0, sizeof(XLogPageReadPrivate));
-	xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
-									&XLogPageRead, &private);
+	xlogreader =
+		XLogReaderAllocate(wal_segment_size, NULL,
+						   XL_ROUTINE(.page_read = &XLogPageRead,
+									  .segment_open = NULL,
+									  .segment_close = wal_segment_close),
+						   &private);
 	if (!xlogreader)
 		ereport(ERROR,
 				(errcode(ERRCODE_OUT_OF_MEMORY),
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 79ff976474..a307faea8b 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -71,7 +71,7 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...)
  */
 XLogReaderState *
 XLogReaderAllocate(int wal_segment_size, const char *waldir,
-				   XLogPageReadCB pagereadfunc, void *private_data)
+				   XLogReaderRoutine *routine, void *private_data)
 {
 	XLogReaderState *state;
 
@@ -81,6 +81,9 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
 	if (!state)
 		return NULL;
 
+	/* initialize caller-provided support functions */
+	state->routine = *routine;
+
 	state->max_block_id = -1;
 
 	/*
@@ -102,7 +105,6 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
 	WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size,
 					   waldir);
 
-	state->read_page = pagereadfunc;
 	/* system_identifier initialized to zeroes above */
 	state->private_data = private_data;
 	/* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */
@@ -137,7 +139,7 @@ XLogReaderFree(XLogReaderState *state)
 	int			block_id;
 
 	if (state->seg.ws_file != -1)
-		close(state->seg.ws_file);
+		state->routine.segment_close(state);
 
 	for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++)
 	{
@@ -250,7 +252,7 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
  * XLogBeginRead() or XLogFindNextRecord() must be called before the first call
  * to XLogReadRecord().
  *
- * If the read_page callback fails to read the requested data, NULL is
+ * If the page_read callback fails to read the requested data, NULL is
  * returned.  The callback is expected to have reported the error; errormsg
  * is set to NULL.
  *
@@ -559,10 +561,10 @@ err:
 
 /*
  * Read a single xlog page including at least [pageptr, reqLen] of valid data
- * via the read_page() callback.
+ * via the page_read() callback.
  *
  * Returns -1 if the required page cannot be read for some reason; errormsg_buf
- * is set in that case (unless the error occurs in the read_page callback).
+ * is set in that case (unless the error occurs in the page_read callback).
  *
  * We fetch the page from a reader-local cache if we know we have the required
  * data and if there hasn't been any error since caching the data.
@@ -589,7 +591,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 	 * Data is not in our buffer.
 	 *
 	 * Every time we actually read the segment, even if we looked at parts of
-	 * it before, we need to do verification as the read_page callback might
+	 * it before, we need to do verification as the page_read callback might
 	 * now be rereading data from a different source.
 	 *
 	 * Whenever switching to a new WAL segment, we read the first page of the
@@ -601,9 +603,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 	{
 		XLogRecPtr	targetSegmentPtr = pageptr - targetPageOff;
 
-		readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ,
-								   state->currRecPtr,
-								   state->readBuf);
+		readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ,
+										   state->currRecPtr,
+										   state->readBuf);
 		if (readLen < 0)
 			goto err;
 
@@ -619,9 +621,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 	 * First, read the requested data length, but at least a short page header
 	 * so that we can validate it.
 	 */
-	readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
-							   state->currRecPtr,
-							   state->readBuf);
+	readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
+									   state->currRecPtr,
+									   state->readBuf);
 	if (readLen < 0)
 		goto err;
 
@@ -638,9 +640,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 	/* still not enough */
 	if (readLen < XLogPageHeaderSize(hdr))
 	{
-		readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr),
-								   state->currRecPtr,
-								   state->readBuf);
+		readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr),
+										   state->currRecPtr,
+										   state->readBuf);
 		if (readLen < 0)
 			goto err;
 	}
@@ -1041,11 +1043,12 @@ err:
 #endif							/* FRONTEND */
 
 /*
+ * Helper function to ease writing of XLogRoutine->page_read callbacks.
+ *
  * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL
  * fetched from timeline 'tli'.
  *
- * 'seg/segcxt' identify the last segment used.  'openSegment' is a callback
- * to open the next segment, if necessary.
+ * 'seg/segcxt' identify the last segment used.
  *
  * Returns true if succeeded, false if an error occurs, in which case
  * 'errinfo' receives error details.
@@ -1054,9 +1057,10 @@ err:
  * WAL buffers when possible.
  */
 bool
-WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
+WALRead(XLogReaderState *state,
+		char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
 		WALOpenSegment *seg, WALSegmentContext *segcxt,
-		WALSegmentOpen openSegment, WALReadError *errinfo)
+		WALReadError *errinfo)
 {
 	char	   *p;
 	XLogRecPtr	recptr;
@@ -1086,10 +1090,11 @@ WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
 			XLogSegNo	nextSegNo;
 
 			if (seg->ws_file >= 0)
-				close(seg->ws_file);
+				state->routine.segment_close(state);
 
 			XLByteToSeg(recptr, nextSegNo, segcxt->ws_segsize);
-			seg->ws_file = openSegment(nextSegNo, segcxt, &tli);
+			seg->ws_file = state->routine.segment_open(state, nextSegNo,
+													   segcxt, &tli);
 
 			/* Update the current segment info. */
 			seg->ws_tli = tli;
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 6cb143e161..bbd801513a 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -783,10 +783,10 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
 	}
 }
 
-/* openSegment callback for WALRead */
-static int
-wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext * segcxt,
-				 TimeLineID *tli_p)
+/* XLogReaderRoutine->segment_open callback for local pg_wal files */
+int
+wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo,
+				 WALSegmentContext *segcxt, TimeLineID *tli_p)
 {
 	TimeLineID	tli = *tli_p;
 	char		path[MAXPGPATH];
@@ -811,8 +811,17 @@ wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext * segcxt,
 	return -1;					/* keep compiler quiet */
 }
 
+/* stock XLogReaderRoutine->segment_close callback */
+void
+wal_segment_close(XLogReaderState *state)
+{
+	close(state->seg.ws_file);
+	/* need to check errno? */
+	state->seg.ws_file = -1;
+}
+
 /*
- * read_page callback for reading local xlog files
+ * XLogReaderRoutine->page_read callback for reading local xlog files
  *
  * Public because it would likely be very helpful for someone writing another
  * output method outside walsender, e.g. in a bgworker.
@@ -937,8 +946,9 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	 * as 'count', read the whole page anyway. It's guaranteed to be
 	 * zero-padded up to the page boundary if it's incomplete.
 	 */
-	if (!WALRead(cur_page, targetPagePtr, XLOG_BLCKSZ, tli, &state->seg,
-				 &state->segcxt, wal_segment_open, &errinfo))
+	if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, tli,
+				 &state->seg, &state->segcxt,
+				 &errinfo))
 		WALReadRaiseError(&errinfo);
 
 	/* number of valid bytes in the buffer */
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 5adf253583..dc69e5ce5f 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -120,7 +120,7 @@ StartupDecodingContext(List *output_plugin_options,
 					   TransactionId xmin_horizon,
 					   bool need_full_snapshot,
 					   bool fast_forward,
-					   XLogPageReadCB read_page,
+					   XLogReaderRoutine *xl_routine,
 					   LogicalOutputPluginWriterPrepareWrite prepare_write,
 					   LogicalOutputPluginWriterWrite do_write,
 					   LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -169,7 +169,7 @@ StartupDecodingContext(List *output_plugin_options,
 
 	ctx->slot = slot;
 
-	ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, read_page, ctx);
+	ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, xl_routine, ctx);
 	if (!ctx->reader)
 		ereport(ERROR,
 				(errcode(ERRCODE_OUT_OF_MEMORY),
@@ -215,7 +215,8 @@ StartupDecodingContext(List *output_plugin_options,
  *		Otherwise, we set for decoding to start from the given LSN without
  *		marking WAL reserved beforehand.  In that scenario, it's up to the
  *		caller to guarantee that WAL remains available.
- * read_page, prepare_write, do_write, update_progress --
+ * xl_routine -- XLogReaderRoutine for underlying XLogReader
+ * prepare_write, do_write, update_progress --
  *		callbacks that perform the use-case dependent, actual, work.
  *
  * Needs to be called while in a memory context that's at least as long lived
@@ -230,7 +231,7 @@ CreateInitDecodingContext(char *plugin,
 						  List *output_plugin_options,
 						  bool need_full_snapshot,
 						  XLogRecPtr restart_lsn,
-						  XLogPageReadCB read_page,
+						  XLogReaderRoutine *xl_routine,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
 						  LogicalOutputPluginWriterWrite do_write,
 						  LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -327,7 +328,7 @@ CreateInitDecodingContext(char *plugin,
 
 	ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
 								 need_full_snapshot, false,
-								 read_page, prepare_write, do_write,
+								 xl_routine, prepare_write, do_write,
 								 update_progress);
 
 	/* call output plugin initialization callback */
@@ -357,7 +358,10 @@ CreateInitDecodingContext(char *plugin,
  * fast_forward
  *		bypass the generation of logical changes.
  *
- * read_page, prepare_write, do_write, update_progress
+ * xl_routine
+ *		XLogReaderRoutine used by underlying xlogreader
+ *
+ * prepare_write, do_write, update_progress
  *		callbacks that have to be filled to perform the use-case dependent,
  *		actual work.
  *
@@ -372,7 +376,7 @@ LogicalDecodingContext *
 CreateDecodingContext(XLogRecPtr start_lsn,
 					  List *output_plugin_options,
 					  bool fast_forward,
-					  XLogPageReadCB read_page,
+					  XLogReaderRoutine *xl_routine,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
 					  LogicalOutputPluginWriterWrite do_write,
 					  LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -425,7 +429,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 
 	ctx = StartupDecodingContext(output_plugin_options,
 								 start_lsn, InvalidTransactionId, false,
-								 fast_forward, read_page, prepare_write,
+								 fast_forward, xl_routine, prepare_write,
 								 do_write, update_progress);
 
 	/* call output plugin initialization callback */
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index f5384f1df8..cfef7f3b6c 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -233,7 +233,9 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 		ctx = CreateDecodingContext(InvalidXLogRecPtr,
 									options,
 									false,
-									read_local_xlog_page,
+									XL_ROUTINE(.page_read = read_local_xlog_page,
+											   .segment_open = wal_segment_open,
+											   .segment_close = wal_segment_close),
 									LogicalOutputPrepareWrite,
 									LogicalOutputWrite, NULL);
 
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index f776de3df7..6a2c0cb6bc 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -152,8 +152,10 @@ create_logical_replication_slot(char *name, char *plugin,
 	ctx = CreateInitDecodingContext(plugin, NIL,
 									false,	/* just catalogs is OK */
 									restart_lsn,
-									read_local_xlog_page, NULL, NULL,
-									NULL);
+									XL_ROUTINE(.page_read = read_local_xlog_page,
+											   .segment_open = wal_segment_open,
+											   .segment_close = wal_segment_close),
+									NULL, NULL, NULL);
 
 	/*
 	 * If caller needs us to determine the decoding start point, do so now.
@@ -464,7 +466,9 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
 		ctx = CreateDecodingContext(InvalidXLogRecPtr,
 									NIL,
 									true,	/* fast_forward */
-									read_local_xlog_page,
+									XL_ROUTINE(.page_read = read_local_xlog_page,
+											   .segment_open = wal_segment_open,
+											   .segment_close = wal_segment_close),
 									NULL, NULL, NULL);
 
 		/*
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 0e933228fc..55afed683f 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -54,8 +54,8 @@
 #include "access/transam.h"
 #include "access/xact.h"
 #include "access/xlog_internal.h"
+#include "access/xlogreader.h"
 #include "access/xlogutils.h"
-
 #include "catalog/pg_authid.h"
 #include "catalog/pg_type.h"
 #include "commands/dbcommands.h"
@@ -248,8 +248,8 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
 static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
-static int	WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
-							  TimeLineID *tli_p);
+static int	WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
+							  WALSegmentContext *segcxt, TimeLineID *tli_p);
 static void UpdateSpillStats(LogicalDecodingContext *ctx);
 
 
@@ -792,7 +792,8 @@ StartReplication(StartReplicationCmd *cmd)
 }
 
 /*
- * read_page callback for logical decoding contexts, as a walsender process.
+ * XLogReaderRoutine->page_read callback for logical decoding contexts, as a
+ * walsender process.
  *
  * Inside the walsender we can do better than read_local_xlog_page,
  * which has to do a plain sleep/busy loop, because the walsender's latch gets
@@ -826,7 +827,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 		count = flushptr - targetPagePtr;	/* part of the page available */
 
 	/* now actually read the data, we know it's there */
-	if (!WALRead(cur_page,
+	if (!WALRead(state,
+				 cur_page,
 				 targetPagePtr,
 				 XLOG_BLCKSZ,
 				 sendSeg->ws_tli,	/* Pass the current TLI because only
@@ -834,7 +836,6 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 									 * TLI is needed. */
 				 sendSeg,
 				 sendCxt,
-				 WalSndSegmentOpen,
 				 &errinfo))
 		WALReadRaiseError(&errinfo);
 
@@ -999,7 +1000,9 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
 		ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
 										InvalidXLogRecPtr,
-										logical_read_xlog_page,
+										XL_ROUTINE(.page_read = logical_read_xlog_page,
+												   .segment_open = WalSndSegmentOpen,
+												   .segment_close = wal_segment_close),
 										WalSndPrepareWrite, WalSndWriteData,
 										WalSndUpdateProgress);
 
@@ -1155,7 +1158,9 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 	 */
 	logical_decoding_ctx =
 		CreateDecodingContext(cmd->startpoint, cmd->options, false,
-							  logical_read_xlog_page,
+							  XL_ROUTINE(.page_read = logical_read_xlog_page,
+										 .segment_open = WalSndSegmentOpen,
+										 .segment_close = wal_segment_close),
 							  WalSndPrepareWrite, WalSndWriteData,
 							  WalSndUpdateProgress);
 
@@ -2422,9 +2427,10 @@ WalSndKill(int code, Datum arg)
 	SpinLockRelease(&walsnd->mutex);
 }
 
-/* walsender's openSegment callback for WALRead */
+/* XLogReaderRoutine->segment_open callback */
 static int
-WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+WalSndSegmentOpen(XLogReaderState *state,
+				  XLogSegNo nextSegNo, WALSegmentContext *segcxt,
 				  TimeLineID *tli_p)
 {
 	char		path[MAXPGPATH];
@@ -2512,6 +2518,12 @@ XLogSendPhysical(void)
 	Size		nbytes;
 	XLogSegNo	segno;
 	WALReadError errinfo;
+	static XLogReaderState fake_xlogreader =
+	{
+		/* XXX no page_read routine used by physical walsender */
+		.routine.segment_open = WalSndSegmentOpen,
+		.routine.segment_close = wal_segment_close
+	};
 
 	/* If requested switch the WAL sender to the stopping state. */
 	if (got_STOPPING)
@@ -2729,7 +2741,9 @@ XLogSendPhysical(void)
 	enlargeStringInfo(&output_message, nbytes);
 
 retry:
-	if (!WALRead(&output_message.data[output_message.len],
+	/* XXX for xlogreader use, we'd call XLogBeginRead+XLogReadRecord here */
+	if (!WALRead(&fake_xlogreader,
+				 &output_message.data[output_message.len],
 				 startptr,
 				 nbytes,
 				 sendSeg->ws_tli,	/* Pass the current TLI because only
@@ -2737,7 +2751,6 @@ retry:
 									 * TLI is needed. */
 				 sendSeg,
 				 sendCxt,
-				 WalSndSegmentOpen,
 				 &errinfo))
 		WALReadRaiseError(&errinfo);
 
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 14a5db5433..5a10faa614 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -66,7 +66,8 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex,
 
 	private.tliIndex = tliIndex;
 	private.restoreCommand = restoreCommand;
-	xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead,
+	xlogreader = XLogReaderAllocate(WalSegSz, datadir,
+									XL_ROUTINE(.page_read = &SimpleXLogPageRead),
 									&private);
 	if (xlogreader == NULL)
 		pg_fatal("out of memory");
@@ -115,7 +116,8 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex)
 	XLogRecPtr	endptr;
 
 	private.tliIndex = tliIndex;
-	xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead,
+	xlogreader = XLogReaderAllocate(WalSegSz, datadir,
+									XL_ROUTINE(.page_read = &SimpleXLogPageRead),
 									&private);
 	if (xlogreader == NULL)
 		pg_fatal("out of memory");
@@ -174,7 +176,8 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
 
 	private.tliIndex = tliIndex;
 	private.restoreCommand = restoreCommand;
-	xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead,
+	xlogreader = XLogReaderAllocate(WalSegSz, datadir,
+									XL_ROUTINE(.page_read = &SimpleXLogPageRead),
 									&private);
 	if (xlogreader == NULL)
 		pg_fatal("out of memory");
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index d7bd9ccac2..e29f65500f 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -279,9 +279,10 @@ identify_target_directory(char *directory, char *fname)
 	return NULL;				/* not reached */
 }
 
-/* pg_waldump's openSegment callback for WALRead */
+/* pg_waldump's XLogReaderRoutine->segment_open callback */
 static int
-WALDumpOpenSegment(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+WALDumpOpenSegment(XLogReaderState *state,
+				   XLogSegNo nextSegNo, WALSegmentContext *segcxt,
 				   TimeLineID *tli_p)
 {
 	TimeLineID	tli = *tli_p;
@@ -321,8 +322,18 @@ WALDumpOpenSegment(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
 }
 
 /*
- * XLogReader read_page callback
+ * pg_waldump's XLogReaderRoutine->segment_close callback.  Same as
+ * wal_segment_close
  */
+static void
+WALDumpCloseSegment(XLogReaderState *state)
+{
+	close(state->seg.ws_file);
+	/* need to check errno? */
+	state->seg.ws_file = -1;
+}
+
+/* pg_waldump's XLogReaderRoutine->page_read callback */
 static int
 WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
 				XLogRecPtr targetPtr, char *readBuff)
@@ -344,8 +355,9 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
 		}
 	}
 
-	if (!WALRead(readBuff, targetPagePtr, count, private->timeline,
-				 &state->seg, &state->segcxt, WALDumpOpenSegment, &errinfo))
+	if (!WALRead(state, readBuff, targetPagePtr, count, private->timeline,
+				 &state->seg, &state->segcxt,
+				 &errinfo))
 	{
 		WALOpenSegment *seg = &errinfo.wre_seg;
 		char		fname[MAXPGPATH];
@@ -1031,8 +1043,12 @@ main(int argc, char **argv)
 	/* done with argument parsing, do the actual work */
 
 	/* we have everything we need, start reading */
-	xlogreader_state = XLogReaderAllocate(WalSegSz, waldir, WALDumpReadPage,
-										  &private);
+	xlogreader_state =
+		XLogReaderAllocate(WalSegSz, waldir,
+						   XL_ROUTINE(.page_read = WALDumpReadPage,
+									  .segment_open = WALDumpOpenSegment,
+									  .segment_close = WALDumpCloseSegment),
+						   &private);
 	if (!xlogreader_state)
 		fatal_error("out of memory");
 
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 4582196e18..6b9f7db646 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -17,6 +17,13 @@
  *		XLogBeginRead() or XLogFindNextRecord(), and call XLogReadRecord()
  *		until it returns NULL.
  *
+ *		Callers supply a page_read callback if they want to to call
+ *		XLogReadRecord or XLogFindNextRecord; it can be passed in as NULL
+ *		otherwise.  The WALRead function can be used as a helper to write
+ *		page_read callbacks, but it is not mandatory; callers that use it,
+ *		must supply open_segment callbacks.  The close_segment callback
+ *		must always be supplied.
+ *
  *		After reading a record with XLogReadRecord(), it's decomposed into
  *		the per-block and main data parts, and the parts can be accessed
  *		with the XLogRec* macros and functions. You can also decode a
@@ -50,12 +57,64 @@ typedef struct WALSegmentContext
 
 typedef struct XLogReaderState XLogReaderState;
 
-/* Function type definition for the read_page callback */
+/* Function type definitions for various xlogreader interactions */
 typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader,
 							   XLogRecPtr targetPagePtr,
 							   int reqLen,
 							   XLogRecPtr targetRecPtr,
 							   char *readBuf);
+typedef int (*WALSegmentOpenCB) (XLogReaderState *xlogreader,
+								 XLogSegNo nextSegNo,
+								 WALSegmentContext *segcxt,
+								 TimeLineID *tli_p);
+typedef void (*WALSegmentCloseCB) (XLogReaderState *xlogreader);
+
+typedef struct XLogReaderRoutine
+{
+	/*
+	 * Data input callback
+	 *
+	 * This callback shall read at least reqLen valid bytes of the xlog page
+	 * starting at targetPagePtr, and store them in readBuf.  The callback
+	 * shall return the number of bytes read (never more than XLOG_BLCKSZ), or
+	 * -1 on failure.  The callback shall sleep, if necessary, to wait for the
+	 * requested bytes to become available.  The callback will not be invoked
+	 * again for the same page unless more than the returned number of bytes
+	 * are needed.
+	 *
+	 * targetRecPtr is the position of the WAL record we're reading.  Usually
+	 * it is equal to targetPagePtr + reqLen, but sometimes xlogreader needs
+	 * to read and verify the page or segment header, before it reads the
+	 * actual WAL record it's interested in.  In that case, targetRecPtr can
+	 * be used to determine which timeline to read the page from.
+	 *
+	 * The callback shall set ->seg.ws_tli to the TLI of the file the page was
+	 * read from.
+	 */
+	XLogPageReadCB page_read;
+
+	/*
+	 * Callback to open the specified WAL segment for reading.  Returns a
+	 * valid file descriptor when the file was opened successfully.
+	 *
+	 * "nextSegNo" is the number of the segment to be opened.
+	 *
+	 * "segcxt" is additional information about the segment.
+	 *
+	 * "tli_p" is an input/output argument. XLogRead() uses it to pass the
+	 * timeline in which the new segment should be found, but the callback can
+	 * use it to return the TLI that it actually opened.
+	 *
+	 * BasicOpenFile() is the preferred way to open the segment file in
+	 * backend code, whereas open(2) should be used in frontend.
+	 */
+	WALSegmentOpenCB segment_open;
+
+	/* WAL segment close callback */
+	WALSegmentCloseCB segment_close;
+} XLogReaderRoutine;
+
+#define XL_ROUTINE(...) &(XLogReaderRoutine){__VA_ARGS__}
 
 typedef struct
 {
@@ -88,33 +147,16 @@ typedef struct
 
 struct XLogReaderState
 {
+	/*
+	 * Operational callbacks
+	 */
+	XLogReaderRoutine routine;
+
 	/* ----------------------------------------
 	 * Public parameters
 	 * ----------------------------------------
 	 */
 
-	/*
-	 * Data input callback (mandatory).
-	 *
-	 * This callback shall read at least reqLen valid bytes of the xlog page
-	 * starting at targetPagePtr, and store them in readBuf.  The callback
-	 * shall return the number of bytes read (never more than XLOG_BLCKSZ), or
-	 * -1 on failure.  The callback shall sleep, if necessary, to wait for the
-	 * requested bytes to become available.  The callback will not be invoked
-	 * again for the same page unless more than the returned number of bytes
-	 * are needed.
-	 *
-	 * targetRecPtr is the position of the WAL record we're reading.  Usually
-	 * it is equal to targetPagePtr + reqLen, but sometimes xlogreader needs
-	 * to read and verify the page or segment header, before it reads the
-	 * actual WAL record it's interested in.  In that case, targetRecPtr can
-	 * be used to determine which timeline to read the page from.
-	 *
-	 * The callback shall set ->seg.ws_tli to the TLI of the file the page was
-	 * read from.
-	 */
-	XLogPageReadCB read_page;
-
 	/*
 	 * System identifier of the xlog files we're about to read.  Set to zero
 	 * (the default value) if unknown or unimportant.
@@ -214,30 +256,13 @@ struct XLogReaderState
 /* Get a new XLogReader */
 extern XLogReaderState *XLogReaderAllocate(int wal_segment_size,
 										   const char *waldir,
-										   XLogPageReadCB pagereadfunc,
+										   XLogReaderRoutine *routine,
 										   void *private_data);
+extern XLogReaderRoutine *LocalXLogReaderRoutine(void);
 
 /* Free an XLogReader */
 extern void XLogReaderFree(XLogReaderState *state);
 
-/*
- * Callback to open the specified WAL segment for reading.  Returns a valid
- * file descriptor when the file was opened successfully.
- *
- * "nextSegNo" is the number of the segment to be opened.
- *
- * "segcxt" is additional information about the segment.
- *
- * "tli_p" is an input/output argument. XLogRead() uses it to pass the
- * timeline in which the new segment should be found, but the callback can use
- * it to return the TLI that it actually opened.
- *
- * BasicOpenFile() is the preferred way to open the segment file in backend
- * code, whereas open(2) should be used in frontend.
- */
-typedef int (*WALSegmentOpen) (XLogSegNo nextSegNo, WALSegmentContext *segcxt,
-							   TimeLineID *tli_p);
-
 /* Initialize supporting structures */
 extern void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
 							   int segsize, const char *waldir);
@@ -269,9 +294,10 @@ typedef struct WALReadError
 	WALOpenSegment wre_seg;		/* Segment we tried to read from. */
 } WALReadError;
 
-extern bool WALRead(char *buf, XLogRecPtr startptr, Size count,
+extern bool WALRead(XLogReaderState *state,
+					char *buf, XLogRecPtr startptr, Size count,
 					TimeLineID tli, WALOpenSegment *seg,
-					WALSegmentContext *segcxt, WALSegmentOpen openSegment,
+					WALSegmentContext *segcxt,
 					WALReadError *errinfo);
 
 /* Functions for decoding an XLogRecord */
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 5181a077d9..68ce815476 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -50,6 +50,11 @@ extern void FreeFakeRelcacheEntry(Relation fakerel);
 extern int	read_local_xlog_page(XLogReaderState *state,
 								 XLogRecPtr targetPagePtr, int reqLen,
 								 XLogRecPtr targetRecPtr, char *cur_page);
+extern int	wal_segment_open(XLogReaderState *state,
+							 XLogSegNo nextSegNo,
+							 WALSegmentContext *segcxt,
+							 TimeLineID *tli_p);
+extern void wal_segment_close(XLogReaderState *state);
 
 extern void XLogReadDetermineTimeline(XLogReaderState *state,
 									  XLogRecPtr wantPage, uint32 wantLength);
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 3b7ca7f1da..c2f2475e5d 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -95,14 +95,14 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
 														 List *output_plugin_options,
 														 bool need_full_snapshot,
 														 XLogRecPtr restart_lsn,
-														 XLogPageReadCB read_page,
+														 XLogReaderRoutine *xl_routine,
 														 LogicalOutputPluginWriterPrepareWrite prepare_write,
 														 LogicalOutputPluginWriterWrite do_write,
 														 LogicalOutputPluginWriterUpdateProgress update_progress);
 extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn,
 													 List *output_plugin_options,
 													 bool fast_forward,
-													 XLogPageReadCB read_page,
+													 XLogReaderRoutine *xl_routine,
 													 LogicalOutputPluginWriterPrepareWrite prepare_write,
 													 LogicalOutputPluginWriterWrite do_write,
 													 LogicalOutputPluginWriterUpdateProgress update_progress);
-- 
2.20.1

Reply via email to