While working on the instance encryption I found it annoying to apply
decyption of XLOG page to three different functions. Attached is a patch that
tries to merge them all into one function, XLogRead(). The existing
implementations differ in the way new segment is opened. So I added a pointer
to callback function as a new argument. This callback handles the specific
ways to determine segment file name and to open the file.
I can split the patch into multiple diffs to make detailed review easier, but
first I'd like to hear if anything is seriously wrong about this
design. Thanks.
--
Antonin Houska
Web: https://www.cybertec-postgresql.com
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index f9a4960f8a..444b5bf910 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1369,7 +1369,7 @@ ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
}
-
+static XLogReadPos *readPos = NULL;
/*
* Reads 2PC data from xlog. During checkpoint this data will be moved to
@@ -1386,8 +1386,17 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
XLogReaderState *xlogreader;
char *errormsg;
- xlogreader = XLogReaderAllocate(wal_segment_size, &read_local_xlog_page,
- NULL);
+
+ /* First time through? */
+ if (readPos == NULL)
+ readPos = XLogReadInitPos();
+
+ /*
+ * read_local_xlog_page() eventually calls XLogRead(), so pass the initial
+ * position.
+ */
+ xlogreader = XLogReaderAllocate(wal_segment_size, read_local_xlog_page,
+ readPos);
if (!xlogreader)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 9196aa3aae..7d0fdfba87 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -17,6 +17,8 @@
*/
#include "postgres.h"
+#include <unistd.h>
+
#include "access/transam.h"
#include "access/xlogrecord.h"
#include "access/xlog_internal.h"
@@ -26,6 +28,7 @@
#include "replication/origin.h"
#ifndef FRONTEND
+#include "pgstat.h"
#include "utils/memutils.h"
#endif
@@ -1005,6 +1008,191 @@ out:
#endif /* FRONTEND */
+/*
+ * Initialize XLOG file position for callers of XLogRead().
+ */
+XLogReadPos *
+XLogReadInitPos(void)
+{
+ XLogReadPos *pos = (XLogReadPos *) palloc(sizeof(XLogReadPos));
+
+ pos->segFile = -1;
+ pos->segNo = 0;
+ pos->segOff = 0;
+ pos->tli = 0;
+ pos->dir = NULL;
+
+ return pos;
+}
+
+#ifdef FRONTEND
+/*
+ * Currently only pg_waldump.c is supposed to set these variables.
+ */
+const char *progname;
+int WalSegSz;
+
+/*
+ * This is a front-end counterpart of XLogFileNameP.
+ */
+static char *
+XLogFileNameFE(TimeLineID tli, XLogSegNo segno)
+{
+ char *result = palloc(MAXFNAMELEN);
+
+ XLogFileName(result, tli, segno, WalSegSz);
+ return result;
+}
+
+/*
+ * XXX pg_waldump.c needs this function. Is there a smart way to put it into
+ * src/common?
+ */
+static void fatal_error(const char *fmt,...) pg_attribute_printf(1, 2);
+
+static void
+fatal_error(const char *fmt,...)
+{
+ va_list args;
+
+ fflush(stdout);
+
+ fprintf(stderr, _("%s: FATAL: "), progname);
+ va_start(args, fmt);
+ vfprintf(stderr, _(fmt), args);
+ va_end(args);
+ fputc('\n', stderr);
+
+ exit(EXIT_FAILURE);
+}
+#endif /* FRONTEND */
+
+/*
+ * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'. If
+ * tli is passed, get the data from timeline *tli. 'pos' is the current
+ * position in the XLOG file and openSegment is a callback that opens the next
+ * segment for reading.
+ *
+ * XXX probably this should be improved to suck data directly from the
+ * WAL buffers when possible.
+ *
+ * Will open, and keep open, one WAL segment stored in the global file
+ * descriptor sendFile. This means if XLogRead is used once, there will
+ * always be one descriptor left open until the process ends, but never
+ * more than one.
+ */
+void
+XLogRead(char *buf, XLogRecPtr startptr, Size count,
+ TimeLineID *tli, XLogReadPos *pos, XLogOpenSegment openSegment)
+{
+ char *p;
+ XLogRecPtr recptr;
+ Size nbytes;
+
+ p = buf;
+ recptr = startptr;
+ nbytes = count;
+
+ while (nbytes > 0)
+ {
+ uint32 startoff;
+ int segbytes;
+ int readbytes;
+ int segsize;
+
+#ifndef FRONTEND
+ segsize = wal_segment_size;
+#else
+ segsize = WalSegSz;
+#endif
+
+ startoff = XLogSegmentOffset(recptr, segsize);
+
+ if (pos->segFile < 0 ||
+ !XLByteInSeg(recptr, pos->segNo, segsize) ||
+ (tli != NULL && *tli != pos->tli))
+ {
+ XLogSegNo nextSegNo;
+
+ /* Switch to another logfile segment */
+ if (pos->segFile >= 0)
+ close(pos->segFile);
+
+ XLByteToSeg(recptr, nextSegNo, segsize);
+
+ /* Open the next segment in the caller's way. */
+ openSegment(nextSegNo, tli, pos);
+ }
+
+ /* Need to seek in the file? */
+ if (pos->segOff != startoff)
+ {
+ if (lseek(pos->segFile, (off_t) startoff, SEEK_SET) < 0)
+#ifndef FRONTEND
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not seek in log segment %s to offset %u: %m",
+ XLogFileNameP(pos->tli, pos->segNo),
+ startoff)));
+#else
+ fatal_error("could not seek in log segment %s to offset %u",
+ XLogFileNameFE(pos->tli, pos->segNo), startoff);
+#endif
+ pos->segOff = startoff;
+ }
+
+ /* How many bytes are within this segment? */
+ if (nbytes > (segsize - startoff))
+ segbytes = segsize - startoff;
+ else
+ segbytes = nbytes;
+
+#ifndef FRONTEND
+ pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
+#endif
+
+ readbytes = read(pos->segFile, p, segbytes);
+
+#ifndef FRONTEND
+ pgstat_report_wait_end();
+#endif
+ if (readbytes < 0)
+ {
+#ifndef FRONTEND
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from log segment %s, offset %u, length %zu: %m",
+ XLogFileNameP(pos->tli, pos->segNo), pos->segOff,
+ (Size) segbytes)));
+#else
+ fatal_error("could not read from log segment %s, offset %u, length %zu",
+ XLogFileNameFE(pos->tli, pos->segNo), pos->segOff,
+ (Size) segbytes);
+#endif
+ }
+ else if (readbytes == 0)
+ {
+#ifndef FRONTEND
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("could not read from log segment %s, offset %u: read %d of %zu",
+ XLogFileNameP(pos->tli, pos->segNo), pos->segOff,
+ readbytes, (Size) segbytes)));
+#else
+ fatal_error("could not read from log segment %s, offset %u: read %d of %zu",
+ XLogFileNameFE(pos->tli, pos->segNo), pos->segOff,
+ readbytes, (Size) segbytes);
+#endif
+ }
+
+ /* Update state for read */
+ recptr += readbytes;
+
+ pos->segOff += readbytes;
+ nbytes -= readbytes;
+ p += readbytes;
+ }
+}
/* ----------------------------------------
* Functions for decoding the data and block references in a record.
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 10a663bae6..4f29c89c06 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -639,128 +639,6 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
forget_invalid_pages(rnode, forkNum, nblocks);
}
-/*
- * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
- * in timeline 'tli'.
- *
- * Will open, and keep open, one WAL segment stored in the static file
- * descriptor 'sendFile'. This means if XLogRead is used once, there will
- * always be one descriptor left open until the process ends, but never
- * more than one.
- *
- * XXX This is very similar to pg_waldump's XLogDumpXLogRead and to XLogRead
- * in walsender.c but for small differences (such as lack of elog() in
- * frontend). Probably these should be merged at some point.
- */
-static void
-XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr,
- Size count)
-{
- char *p;
- XLogRecPtr recptr;
- Size nbytes;
-
- /* state maintained across calls */
- static int sendFile = -1;
- static XLogSegNo sendSegNo = 0;
- static TimeLineID sendTLI = 0;
- static uint32 sendOff = 0;
-
- Assert(segsize == wal_segment_size);
-
- p = buf;
- recptr = startptr;
- nbytes = count;
-
- while (nbytes > 0)
- {
- uint32 startoff;
- int segbytes;
- int readbytes;
-
- startoff = XLogSegmentOffset(recptr, segsize);
-
- /* Do we need to switch to a different xlog segment? */
- if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, segsize) ||
- sendTLI != tli)
- {
- char path[MAXPGPATH];
-
- if (sendFile >= 0)
- close(sendFile);
-
- XLByteToSeg(recptr, sendSegNo, segsize);
-
- XLogFilePath(path, tli, sendSegNo, segsize);
-
- sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY);
-
- if (sendFile < 0)
- {
- if (errno == ENOENT)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("requested WAL segment %s has already been removed",
- path)));
- else
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not open file \"%s\": %m",
- path)));
- }
- sendOff = 0;
- sendTLI = tli;
- }
-
- /* Need to seek in the file? */
- if (sendOff != startoff)
- {
- if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
- {
- char path[MAXPGPATH];
- int save_errno = errno;
-
- XLogFilePath(path, tli, sendSegNo, segsize);
- errno = save_errno;
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not seek in log segment %s to offset %u: %m",
- path, startoff)));
- }
- sendOff = startoff;
- }
-
- /* How many bytes are within this segment? */
- if (nbytes > (segsize - startoff))
- segbytes = segsize - startoff;
- else
- segbytes = nbytes;
-
- pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
- readbytes = read(sendFile, p, segbytes);
- pgstat_report_wait_end();
- if (readbytes <= 0)
- {
- char path[MAXPGPATH];
- int save_errno = errno;
-
- XLogFilePath(path, tli, sendSegNo, segsize);
- errno = save_errno;
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not read from log segment %s, offset %u, length %lu: %m",
- path, sendOff, (unsigned long) segbytes)));
- }
-
- /* Update state for read */
- recptr += readbytes;
-
- sendOff += readbytes;
- nbytes -= readbytes;
- p += readbytes;
- }
-}
-
/*
* Determine which timeline to read an xlog page from and set the
* XLogReaderState's currTLI to that timeline ID.
@@ -896,6 +774,37 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
}
}
+/*
+ * Callback for XLogRead() to open the next segment.
+ */
+static void
+read_local_xlog_page_open_segment(XLogSegNo segNo, TimeLineID *tli,
+ XLogReadPos *pos)
+{
+ char path[MAXPGPATH];
+
+ XLogFilePath(path, *tli, segNo, wal_segment_size);
+ pos->segFile = BasicOpenFile(path, O_RDONLY | PG_BINARY);
+
+ if (pos->segFile < 0)
+ {
+ if (errno == ENOENT)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("requested WAL segment %s has already been removed",
+ path)));
+ else
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\": %m",
+ path)));
+ }
+
+ pos->segNo = segNo;
+ pos->segOff = 0;
+ pos->tli = *tli;
+}
+
/*
* read_page callback for reading local xlog files
*
@@ -1017,14 +926,16 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
count = read_upto - targetPagePtr;
}
+ Assert(state->wal_segment_size == wal_segment_size);
+
/*
* Even though we just determined how much of the page can be validly read
* as 'count', read the whole page anyway. It's guaranteed to be
* zero-padded up to the page boundary if it's incomplete.
*/
- XLogRead(cur_page, state->wal_segment_size, *pageTLI, targetPagePtr,
- XLOG_BLCKSZ);
-
+ XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ, pageTLI,
+ (XLogReadPos *) state->private_data,
+ read_local_xlog_page_open_segment);
/* number of valid bytes in the buffer */
return count;
}
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 424fe86a1b..20c1ad4a35 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -124,6 +124,7 @@ StartupDecodingContext(List *output_plugin_options,
bool need_full_snapshot,
bool fast_forward,
XLogPageReadCB read_page,
+ void *read_page_arg,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -172,14 +173,13 @@ StartupDecodingContext(List *output_plugin_options,
ctx->slot = slot;
- ctx->reader = XLogReaderAllocate(wal_segment_size, read_page, ctx);
+ ctx->reader = XLogReaderAllocate(wal_segment_size, read_page,
+ read_page_arg);
if (!ctx->reader)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
- ctx->reader->private_data = ctx;
-
ctx->reorder = ReorderBufferAllocate();
ctx->snapshot_builder =
AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
@@ -234,6 +234,7 @@ CreateInitDecodingContext(char *plugin,
bool need_full_snapshot,
XLogRecPtr restart_lsn,
XLogPageReadCB read_page,
+ void *read_page_arg,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -330,8 +331,8 @@ CreateInitDecodingContext(char *plugin,
ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
need_full_snapshot, false,
- read_page, prepare_write, do_write,
- update_progress);
+ read_page, read_page_arg, prepare_write,
+ do_write, update_progress);
/* call output plugin initialization callback */
old_context = MemoryContextSwitchTo(ctx->context);
@@ -376,6 +377,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
List *output_plugin_options,
bool fast_forward,
XLogPageReadCB read_page,
+ void *read_page_arg,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -428,8 +430,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
ctx = StartupDecodingContext(output_plugin_options,
start_lsn, InvalidTransactionId, false,
- fast_forward, read_page, prepare_write,
- do_write, update_progress);
+ fast_forward, read_page, read_page_arg,
+ prepare_write, do_write, update_progress);
/* call output plugin initialization callback */
old_context = MemoryContextSwitchTo(ctx->context);
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index d974400d6e..b2f30d53f5 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -248,13 +248,20 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
PG_TRY();
{
- /* restart at slot's confirmed_flush */
+ /*
+ * Restart at slot's confirmed_flush.
+ *
+ * logical_read_local_xlog_page() eventually calls XLogRead(), so set
+ * the initial position.
+ */
ctx = CreateDecodingContext(InvalidXLogRecPtr,
options,
false,
logical_read_local_xlog_page,
+ XLogReadInitPos(),
LogicalOutputPrepareWrite,
- LogicalOutputWrite, NULL);
+ LogicalOutputWrite,
+ NULL);
MemoryContextSwitchTo(oldcontext);
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 182fe5bc82..dbcaa9c1d8 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -14,6 +14,7 @@
#include "access/htup_details.h"
#include "access/xlog_internal.h"
+#include "access/xlogreader.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "replication/decode.h"
@@ -144,8 +145,9 @@ create_logical_replication_slot(char *name, char *plugin,
ctx = CreateInitDecodingContext(plugin, NIL,
false, /* do not build snapshot */
restart_lsn,
- logical_read_local_xlog_page, NULL, NULL,
- NULL);
+ logical_read_local_xlog_page,
+ XLogReadInitPos(),
+ NULL, NULL, NULL);
/* build initial snapshot, might take a while */
DecodingContextFindStartpoint(ctx);
@@ -401,11 +403,15 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
* Create our decoding context in fast_forward mode, passing start_lsn
* as InvalidXLogRecPtr, so that we start processing from my slot's
* confirmed_flush.
+ *
+ * logical_read_local_xlog_page() eventually calls XLogRead(), so set
+ * the initial position.
*/
ctx = CreateDecodingContext(InvalidXLogRecPtr,
NIL,
true, /* fast_forward */
logical_read_local_xlog_page,
+ XLogReadInitPos(),
NULL, NULL, NULL);
/*
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index aae6adc15c..56f9ae88b1 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -128,16 +128,7 @@ bool log_replication_commands = false;
*/
bool wake_wal_senders = false;
-/*
- * These variables are used similarly to openLogFile/SegNo/Off,
- * but for walsender to read the XLOG.
- */
-static int sendFile = -1;
-static XLogSegNo sendSegNo = 0;
-static uint32 sendOff = 0;
-
-/* Timeline ID of the currently open file */
-static TimeLineID curFileTimeLine = 0;
+static XLogReadPos *sendPos= NULL;
/*
* These variables keep track of the state of the timeline we're currently
@@ -256,7 +247,8 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
-static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
+static void WalSndOpenSegment(XLogSegNo segNo, TimeLineID *tli,
+ XLogReadPos *pos);
/* Initialize walsender process before entering the main command loop */
@@ -285,6 +277,9 @@ InitWalSender(void)
/* Initialize empty timestamp buffer for lag tracking. */
lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker));
+
+ /* Make sure we can remember the current read position in XLOG. */
+ sendPos = XLogReadInitPos();
}
/*
@@ -301,10 +296,10 @@ WalSndErrorCleanup(void)
ConditionVariableCancelSleep();
pgstat_report_wait_end();
- if (sendFile >= 0)
+ if (sendPos && sendPos->segFile >= 0)
{
- close(sendFile);
- sendFile = -1;
+ close(sendPos->segFile);
+ sendPos->segFile = -1;
}
if (MyReplicationSlot != NULL)
@@ -787,7 +782,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
count = flushptr - targetPagePtr; /* part of the page available */
/* now actually read the data, we know it's there */
- XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
+ XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ, NULL, sendPos,
+ WalSndOpenSegment);
return count;
}
@@ -933,9 +929,13 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
need_full_snapshot = true;
}
+ /*
+ * logical_read_xlog_page() eventually calls XLogRead(), so pass the
+ * initial position.
+ */
ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
InvalidXLogRecPtr,
- logical_read_xlog_page,
+ logical_read_xlog_page, sendPos,
WalSndPrepareWrite, WalSndWriteData,
WalSndUpdateProgress);
@@ -1083,10 +1083,13 @@ StartLogicalReplication(StartReplicationCmd *cmd)
* position.
*
* Do this before sending CopyBoth, so that any errors are reported early.
+ *
+ * logical_read_xlog_page() eventually calls XLogRead(), so pass the
+ * initial position.
*/
logical_decoding_ctx =
CreateDecodingContext(cmd->startpoint, cmd->options, false,
- logical_read_xlog_page,
+ logical_read_xlog_page, sendPos,
WalSndPrepareWrite, WalSndWriteData,
WalSndUpdateProgress);
@@ -2344,187 +2347,76 @@ WalSndKill(int code, Datum arg)
}
/*
- * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
- *
- * XXX probably this should be improved to suck data directly from the
- * WAL buffers when possible.
- *
- * Will open, and keep open, one WAL segment stored in the global file
- * descriptor sendFile. This means if XLogRead is used once, there will
- * always be one descriptor left open until the process ends, but never
- * more than one.
+ * Callback for XLogRead() to open the next segment.
*/
-static void
-XLogRead(char *buf, XLogRecPtr startptr, Size count)
+void
+WalSndOpenSegment(XLogSegNo segNo, TimeLineID *tli, XLogReadPos *pos)
{
- char *p;
- XLogRecPtr recptr;
- Size nbytes;
- XLogSegNo segno;
+ char path[MAXPGPATH];
-retry:
- p = buf;
- recptr = startptr;
- nbytes = count;
+ /*
+ * The timeline is determined below, caller should not do anything about
+ * it.
+ */
+ Assert(tli == NULL);
- while (nbytes > 0)
+ /*-------
+ * When reading from a historic timeline, and there is a timeline switch
+ * within this segment, read from the WAL segment belonging to the new
+ * timeline.
+ *
+ * For example, imagine that this server is currently on timeline 5, and
+ * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
+ * 0/13002088. In pg_wal, we have these files:
+ *
+ * ...
+ * 000000040000000000000012
+ * 000000040000000000000013
+ * 000000050000000000000013
+ * 000000050000000000000014
+ * ...
+ *
+ * In this situation, when requested to send the WAL from segment 0x13, on
+ * timeline 4, we read the WAL from file 000000050000000000000013. Archive
+ * recovery prefers files from newer timelines, so if the segment was
+ * restored from the archive on this server, the file belonging to the old
+ * timeline, 000000040000000000000013, might not exist. Their contents are
+ * equal up to the switchpoint, because at a timeline switch, the used
+ * portion of the old segment is copied to the new file. -------
+ */
+ pos->tli = sendTimeLine;
+ if (sendTimeLineIsHistoric)
{
- uint32 startoff;
- int segbytes;
- int readbytes;
-
- startoff = XLogSegmentOffset(recptr, wal_segment_size);
-
- if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, wal_segment_size))
- {
- char path[MAXPGPATH];
-
- /* Switch to another logfile segment */
- if (sendFile >= 0)
- close(sendFile);
+ XLogSegNo endSegNo;
- XLByteToSeg(recptr, sendSegNo, wal_segment_size);
-
- /*-------
- * When reading from a historic timeline, and there is a timeline
- * switch within this segment, read from the WAL segment belonging
- * to the new timeline.
- *
- * For example, imagine that this server is currently on timeline
- * 5, and we're streaming timeline 4. The switch from timeline 4
- * to 5 happened at 0/13002088. In pg_wal, we have these files:
- *
- * ...
- * 000000040000000000000012
- * 000000040000000000000013
- * 000000050000000000000013
- * 000000050000000000000014
- * ...
- *
- * In this situation, when requested to send the WAL from
- * segment 0x13, on timeline 4, we read the WAL from file
- * 000000050000000000000013. Archive recovery prefers files from
- * newer timelines, so if the segment was restored from the
- * archive on this server, the file belonging to the old timeline,
- * 000000040000000000000013, might not exist. Their contents are
- * equal up to the switchpoint, because at a timeline switch, the
- * used portion of the old segment is copied to the new file.
- *-------
- */
- curFileTimeLine = sendTimeLine;
- if (sendTimeLineIsHistoric)
- {
- XLogSegNo endSegNo;
-
- XLByteToSeg(sendTimeLineValidUpto, endSegNo, wal_segment_size);
- if (sendSegNo == endSegNo)
- curFileTimeLine = sendTimeLineNextTLI;
- }
-
- XLogFilePath(path, curFileTimeLine, sendSegNo, wal_segment_size);
-
- sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY);
- if (sendFile < 0)
- {
- /*
- * If the file is not found, assume it's because the standby
- * asked for a too old WAL segment that has already been
- * removed or recycled.
- */
- if (errno == ENOENT)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("requested WAL segment %s has already been removed",
- XLogFileNameP(curFileTimeLine, sendSegNo))));
- else
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not open file \"%s\": %m",
- path)));
- }
- sendOff = 0;
- }
-
- /* Need to seek in the file? */
- if (sendOff != startoff)
- {
- if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not seek in log segment %s to offset %u: %m",
- XLogFileNameP(curFileTimeLine, sendSegNo),
- startoff)));
- sendOff = startoff;
- }
+ XLByteToSeg(sendTimeLineValidUpto, endSegNo, wal_segment_size);
+ if (pos->segNo == endSegNo)
+ pos->tli = sendTimeLineNextTLI;
+ }
- /* How many bytes are within this segment? */
- if (nbytes > (wal_segment_size - startoff))
- segbytes = wal_segment_size - startoff;
- else
- segbytes = nbytes;
+ XLogFilePath(path, pos->tli, segNo, wal_segment_size);
+ pos->segFile = BasicOpenFile(path, O_RDONLY | PG_BINARY);
- pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
- readbytes = read(sendFile, p, segbytes);
- pgstat_report_wait_end();
- if (readbytes < 0)
- {
+ if (pos->segFile < 0)
+ {
+ /*
+ * If the file is not found, assume it's because the standby asked for
+ * a too old WAL segment that has already been removed or recycled.
+ */
+ if (errno == ENOENT)
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not read from log segment %s, offset %u, length %zu: %m",
- XLogFileNameP(curFileTimeLine, sendSegNo),
- sendOff, (Size) segbytes)));
- }
- else if (readbytes == 0)
- {
+ errmsg("requested WAL segment %s has already been removed",
+ XLogFileNameP(pos->tli, pos->segNo))));
+ else
ereport(ERROR,
- (errcode(ERRCODE_DATA_CORRUPTED),
- errmsg("could not read from log segment %s, offset %u: read %d of %zu",
- XLogFileNameP(curFileTimeLine, sendSegNo),
- sendOff, readbytes, (Size) segbytes)));
- }
-
- /* Update state for read */
- recptr += readbytes;
-
- sendOff += readbytes;
- nbytes -= readbytes;
- p += readbytes;
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\": %m",
+ path)));
}
- /*
- * After reading into the buffer, check that what we read was valid. We do
- * this after reading, because even though the segment was present when we
- * opened it, it might get recycled or removed while we read it. The
- * read() succeeds in that case, but the data we tried to read might
- * already have been overwritten with new WAL records.
- */
- XLByteToSeg(startptr, segno, wal_segment_size);
- CheckXLogRemoved(segno, ThisTimeLineID);
-
- /*
- * During recovery, the currently-open WAL file might be replaced with the
- * file of the same name retrieved from archive. So we always need to
- * check what we read was valid after reading into the buffer. If it's
- * invalid, we try to open and read the file again.
- */
- if (am_cascading_walsender)
- {
- WalSnd *walsnd = MyWalSnd;
- bool reload;
-
- SpinLockAcquire(&walsnd->mutex);
- reload = walsnd->needreload;
- walsnd->needreload = false;
- SpinLockRelease(&walsnd->mutex);
-
- if (reload && sendFile >= 0)
- {
- close(sendFile);
- sendFile = -1;
-
- goto retry;
- }
- }
+ pos->segNo = segNo;
+ pos->segOff = 0;
}
/*
@@ -2544,6 +2436,7 @@ XLogSendPhysical(void)
XLogRecPtr startptr;
XLogRecPtr endptr;
Size nbytes;
+ XLogSegNo segno;
/* If requested switch the WAL sender to the stopping state. */
if (got_STOPPING)
@@ -2686,9 +2579,9 @@ XLogSendPhysical(void)
if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
{
/* close the current file. */
- if (sendFile >= 0)
- close(sendFile);
- sendFile = -1;
+ if (sendPos->segFile >= 0)
+ close(sendPos->segFile);
+ sendPos->segFile = -1;
/* Send CopyDone */
pq_putmessage_noblock('c', NULL, 0);
@@ -2759,7 +2652,48 @@ XLogSendPhysical(void)
* calls.
*/
enlargeStringInfo(&output_message, nbytes);
- XLogRead(&output_message.data[output_message.len], startptr, nbytes);
+
+retry:
+ XLogRead(&output_message.data[output_message.len], startptr, nbytes,
+ NULL, /* WalSndOpenSegment will determine TLI */
+ sendPos,
+ WalSndOpenSegment);
+
+ /*
+ * After reading into the buffer, check that what we read was valid. We do
+ * this after reading, because even though the segment was present when we
+ * opened it, it might get recycled or removed while we read it. The
+ * read() succeeds in that case, but the data we tried to read might
+ * already have been overwritten with new WAL records.
+ */
+ XLByteToSeg(startptr, segno, wal_segment_size);
+ CheckXLogRemoved(segno, ThisTimeLineID);
+
+ /*
+ * During recovery, the currently-open WAL file might be replaced with the
+ * file of the same name retrieved from archive. So we always need to
+ * check what we read was valid after reading into the buffer. If it's
+ * invalid, we try to open and read the file again.
+ */
+ if (am_cascading_walsender)
+ {
+ WalSnd *walsnd = MyWalSnd;
+ bool reload;
+
+ SpinLockAcquire(&walsnd->mutex);
+ reload = walsnd->needreload;
+ walsnd->needreload = false;
+ SpinLockRelease(&walsnd->mutex);
+
+ if (reload && sendPos->segFile >= 0)
+ {
+ close(sendPos->segFile);
+ sendPos->segFile = -1;
+
+ goto retry;
+ }
+ }
+
output_message.len += nbytes;
output_message.data[output_message.len] = '\0';
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index e106fb2ed1..7dd63dd735 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -26,9 +26,9 @@
#include "rmgrdesc.h"
-static const char *progname;
+const char *progname;
-static int WalSegSz;
+int WalSegSz;
typedef struct XLogDumpPrivate
{
@@ -37,6 +37,7 @@ typedef struct XLogDumpPrivate
XLogRecPtr startptr;
XLogRecPtr endptr;
bool endptr_reached;
+ XLogReadPos *pos;
} XLogDumpPrivate;
typedef struct XLogDumpConfig
@@ -296,126 +297,45 @@ identify_target_directory(XLogDumpPrivate *private, char *directory,
fatal_error("could not find any WAL file");
}
-/*
- * Read count bytes from a segment file in the specified directory, for the
- * given timeline, containing the specified record pointer; store the data in
- * the passed buffer.
- */
static void
-XLogDumpXLogRead(const char *directory, TimeLineID timeline_id,
- XLogRecPtr startptr, char *buf, Size count)
+XLogDumpOpenSegment(XLogSegNo segNo, TimeLineID *tli, XLogReadPos *pos)
{
- char *p;
- XLogRecPtr recptr;
- Size nbytes;
-
- static int sendFile = -1;
- static XLogSegNo sendSegNo = 0;
- static uint32 sendOff = 0;
+ char fname[MAXPGPATH];
+ int tries;
- p = buf;
- recptr = startptr;
- nbytes = count;
+ XLogFileName(fname, *tli, segNo, WalSegSz);
- while (nbytes > 0)
+ /*
+ * In follow mode there is a short period of time after the server has
+ * written the end of the previous file before the new file is
+ * available. So we loop for 5 seconds looking for the file to appear
+ * before giving up.
+ */
+ for (tries = 0; tries < 10; tries++)
{
- uint32 startoff;
- int segbytes;
- int readbytes;
-
- startoff = XLogSegmentOffset(recptr, WalSegSz);
-
- if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, WalSegSz))
- {
- char fname[MAXFNAMELEN];
- int tries;
-
- /* Switch to another logfile segment */
- if (sendFile >= 0)
- close(sendFile);
-
- XLByteToSeg(recptr, sendSegNo, WalSegSz);
-
- XLogFileName(fname, timeline_id, sendSegNo, WalSegSz);
-
- /*
- * In follow mode there is a short period of time after the server
- * has written the end of the previous file before the new file is
- * available. So we loop for 5 seconds looking for the file to
- * appear before giving up.
- */
- for (tries = 0; tries < 10; tries++)
- {
- sendFile = open_file_in_directory(directory, fname);
- if (sendFile >= 0)
- break;
- if (errno == ENOENT)
- {
- int save_errno = errno;
-
- /* File not there yet, try again */
- pg_usleep(500 * 1000);
-
- errno = save_errno;
- continue;
- }
- /* Any other error, fall through and fail */
- break;
- }
-
- if (sendFile < 0)
- fatal_error("could not find file \"%s\": %s",
- fname, strerror(errno));
- sendOff = 0;
- }
-
- /* Need to seek in the file? */
- if (sendOff != startoff)
- {
- if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
- {
- int err = errno;
- char fname[MAXPGPATH];
-
- XLogFileName(fname, timeline_id, sendSegNo, WalSegSz);
-
- fatal_error("could not seek in log file %s to offset %u: %s",
- fname, startoff, strerror(err));
- }
- sendOff = startoff;
- }
-
- /* How many bytes are within this segment? */
- if (nbytes > (WalSegSz - startoff))
- segbytes = WalSegSz - startoff;
- else
- segbytes = nbytes;
-
- readbytes = read(sendFile, p, segbytes);
- if (readbytes <= 0)
+ pos->segFile = open_file_in_directory(pos->dir, fname);
+ if (pos->segFile >= 0)
+ break;
+ if (errno == ENOENT)
{
- int err = errno;
- char fname[MAXPGPATH];
int save_errno = errno;
- XLogFileName(fname, timeline_id, sendSegNo, WalSegSz);
- errno = save_errno;
+ /* File not there yet, try again */
+ pg_usleep(500 * 1000);
- if (readbytes < 0)
- fatal_error("could not read from log file %s, offset %u, length %d: %s",
- fname, sendOff, segbytes, strerror(err));
- else if (readbytes == 0)
- fatal_error("could not read from log file %s, offset %u: read %d of %zu",
- fname, sendOff, readbytes, (Size) segbytes);
+ errno = save_errno;
+ continue;
}
+ /* Any other error, fall through and fail */
+ break;
+ }
- /* Update state for read */
- recptr += readbytes;
+ if (pos->segFile < 0)
+ fatal_error("could not find file \"%s\": %s",
+ fname, strerror(errno));
- sendOff += readbytes;
- nbytes -= readbytes;
- p += readbytes;
- }
+ pos->segNo = segNo;
+ pos->segOff = 0;
}
/*
@@ -441,8 +361,8 @@ XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
}
}
- XLogDumpXLogRead(private->inpath, private->timeline, targetPagePtr,
- readBuff, count);
+ XLogRead(readBuff, targetPagePtr, count, &private->timeline,
+ private->pos, XLogDumpOpenSegment);
return count;
}
@@ -852,6 +772,7 @@ main(int argc, char **argv)
private.startptr = InvalidXLogRecPtr;
private.endptr = InvalidXLogRecPtr;
private.endptr_reached = false;
+ private.pos = XLogReadInitPos();
config.bkp_details = false;
config.stop_after_records = -1;
@@ -1083,6 +1004,9 @@ main(int argc, char **argv)
else
identify_target_directory(&private, private.inpath, NULL);
+ /* The XLOG position can be used separate from "private". */
+ private.pos->dir = private.inpath;
+
/* we don't know what to print */
if (XLogRecPtrIsInvalid(private.startptr))
{
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index f3bae0bf49..9bddbd3042 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -219,6 +219,31 @@ extern void XLogReaderInvalReadState(XLogReaderState *state);
extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr);
#endif /* FRONTEND */
+/*
+ * Position in XLOG file while reading it.
+ */
+typedef struct XLogReadPos
+{
+ int segFile; /* segment file descriptor */
+ XLogSegNo segNo; /* segment number */
+ uint32 segOff; /* offset in the segment */
+ TimeLineID tli; /* timeline ID of the currently open file */
+
+ char *dir; /* directory (only needed by frontends) */
+} XLogReadPos;
+
+/*
+ * Callback to open the specified XLOG segment 'segNo' in timeline 'tli' for
+ * reading and update the position accordingly.
+ */
+typedef void (*XLogOpenSegment) (XLogSegNo segNo, TimeLineID *tli,
+ XLogReadPos *pos);
+
+extern XLogReadPos *XLogReadInitPos(void);
+extern void XLogRead(char *buf, XLogRecPtr startptr, Size count,
+ TimeLineID *tli, XLogReadPos *pos,
+ XLogOpenSegment openSegment);
+
/* Functions for decoding an XLogRecord */
extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record,
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 0a2a63a48c..59b29433eb 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -99,6 +99,7 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
bool need_full_snapshot,
XLogRecPtr restart_lsn,
XLogPageReadCB read_page,
+ void *read_page_arg,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress);
@@ -107,6 +108,7 @@ extern LogicalDecodingContext *CreateDecodingContext(
List *output_plugin_options,
bool fast_forward,
XLogPageReadCB read_page,
+ void *read_page_arg,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress);