Hello Per discussion in thread [1], I propose the following patch to give another adjustment to the xlogreader API. This results in a small but not insignificat net reduction of lines of code. What this patch does is adjust the signature of these new xlogreader callbacks, making the API simpler. The changes are:
* the segment_open callback installs the FD in xlogreader state itself, instead of passing the FD back. This was suggested by Kyotaro Horiguchi in that thread[2]. * We no longer pass segcxt to segment_open; it's in XLogReaderState, which is already an argument. * We no longer pass seg/segcxt to WALRead; instead, that function takes them from XLogReaderState, which is already an argument. (This means XLogSendPhysical has to drink more of the fake_xlogreader kool-aid.) I claim the reason to do it now instead of pg14 is to make it simpler for third-party xlogreader callers to adjust. (Some might be thinking that I do this to avoid an API change later, but my guts tell me that we'll adjust xlogreader again in pg14 for the encryption stuff and other reasons, so.) [1] https://postgr.es/m/20200406025651.fpzdb5yyb7qyh...@alap3.anarazel.de [2] https://postgr.es/m/20200508.114228.963995144765118400.horikyota....@gmail.com -- Álvaro Herrera Developer, https://www.PostgreSQL.org/
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 7cee8b92c9..4b6f4ada6e 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -1050,8 +1050,6 @@ err: * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL * fetched from timeline 'tli'. * - * 'seg/segcxt' identify the last segment used. - * * Returns true if succeeded, false if an error occurs, in which case * 'errinfo' receives error details. * @@ -1061,7 +1059,6 @@ err: bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, - WALOpenSegment *seg, WALSegmentContext *segcxt, WALReadError *errinfo) { char *p; @@ -1078,34 +1075,34 @@ WALRead(XLogReaderState *state, int segbytes; int readbytes; - startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize); + startoff = XLogSegmentOffset(recptr, state->segcxt.ws_segsize); /* * If the data we want is not in a segment we have open, close what we * have (if anything) and open the next one, using the caller's * provided openSegment callback. */ - if (seg->ws_file < 0 || - !XLByteInSeg(recptr, seg->ws_segno, segcxt->ws_segsize) || - tli != seg->ws_tli) + if (state->seg.ws_file < 0 || + !XLByteInSeg(recptr, state->seg.ws_segno, state->segcxt.ws_segsize) || + tli != state->seg.ws_tli) { XLogSegNo nextSegNo; - if (seg->ws_file >= 0) + if (state->seg.ws_file >= 0) state->routine.segment_close(state); - XLByteToSeg(recptr, nextSegNo, segcxt->ws_segsize); - seg->ws_file = state->routine.segment_open(state, nextSegNo, - segcxt, &tli); + XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize); + state->routine.segment_open(state, nextSegNo, &tli); + Assert(state->seg.ws_file >= 0); /* shouldn't happen */ /* Update the current segment info. */ - seg->ws_tli = tli; - seg->ws_segno = nextSegNo; + state->seg.ws_tli = tli; + state->seg.ws_segno = nextSegNo; } /* How many bytes are within this segment? */ - if (nbytes > (segcxt->ws_segsize - startoff)) - segbytes = segcxt->ws_segsize - startoff; + if (nbytes > (state->segcxt.ws_segsize - startoff)) + segbytes = state->segcxt.ws_segsize - startoff; else segbytes = nbytes; @@ -1115,7 +1112,7 @@ WALRead(XLogReaderState *state, /* Reset errno first; eases reporting non-errno-affecting errors */ errno = 0; - readbytes = pg_pread(seg->ws_file, p, segbytes, (off_t) startoff); + readbytes = pg_pread(state->seg.ws_file, p, segbytes, (off_t) startoff); #ifndef FRONTEND pgstat_report_wait_end(); @@ -1127,7 +1124,7 @@ WALRead(XLogReaderState *state, errinfo->wre_req = segbytes; errinfo->wre_read = readbytes; errinfo->wre_off = startoff; - errinfo->wre_seg = *seg; + errinfo->wre_seg = state->seg; return false; } diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index bbd801513a..0fc27215df 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -784,31 +784,28 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa } /* XLogReaderRoutine->segment_open callback for local pg_wal files */ -int +void wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, - WALSegmentContext *segcxt, TimeLineID *tli_p) + TimeLineID *tli_p) { TimeLineID tli = *tli_p; char path[MAXPGPATH]; - int fd; - XLogFilePath(path, tli, nextSegNo, segcxt->ws_segsize); - fd = BasicOpenFile(path, O_RDONLY | PG_BINARY); - if (fd >= 0) - return fd; - - if (errno == ENOENT) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("requested WAL segment %s has already been removed", - path))); - else - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\": %m", - path))); - - return -1; /* keep compiler quiet */ + XLogFilePath(path, tli, nextSegNo, state->segcxt.ws_segsize); + state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY); + if (state->seg.ws_file < 0) + { + if (errno == ENOENT) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("requested WAL segment %s has already been removed", + path))); + else + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", + path))); + } } /* stock XLogReaderRoutine->segment_close callback */ @@ -947,7 +944,6 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, * zero-padded up to the page boundary if it's incomplete. */ if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, tli, - &state->seg, &state->segcxt, &errinfo)) WALReadRaiseError(&errinfo); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 9f14b99231..63e1efc778 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -248,8 +248,8 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time); static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now); static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); -static int WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, - WALSegmentContext *segcxt, TimeLineID *tli_p); +static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, + TimeLineID *tli_p); static void UpdateSpillStats(LogicalDecodingContext *ctx); @@ -840,8 +840,6 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req sendSeg->ws_tli, /* Pass the current TLI because only * WalSndSegmentOpen controls whether new * TLI is needed. */ - sendSeg, - sendCxt, &errinfo)) WALReadRaiseError(&errinfo); @@ -2447,13 +2445,11 @@ WalSndKill(int code, Datum arg) } /* XLogReaderRoutine->segment_open callback */ -static int -WalSndSegmentOpen(XLogReaderState *state, - XLogSegNo nextSegNo, WALSegmentContext *segcxt, +static void +WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p) { char path[MAXPGPATH]; - int fd; /*------- * When reading from a historic timeline, and there is a timeline switch @@ -2484,15 +2480,15 @@ WalSndSegmentOpen(XLogReaderState *state, { XLogSegNo endSegNo; - XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize); + XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize); if (sendSeg->ws_segno == endSegNo) *tli_p = sendTimeLineNextTLI; } - XLogFilePath(path, *tli_p, nextSegNo, segcxt->ws_segsize); - fd = BasicOpenFile(path, O_RDONLY | PG_BINARY); - if (fd >= 0) - return fd; + XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize); + state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY); + if (state->seg.ws_file >= 0) + return; /* * If the file is not found, assume it's because the standby asked for a @@ -2515,7 +2511,6 @@ WalSndSegmentOpen(XLogReaderState *state, (errcode_for_file_access(), errmsg("could not open file \"%s\": %m", path))); - return -1; /* keep compiler quiet */ } /* @@ -2760,6 +2755,8 @@ XLogSendPhysical(void) enlargeStringInfo(&output_message, nbytes); retry: + fake_xlogreader.seg = *sendSeg; + fake_xlogreader.segcxt = *sendCxt; if (!WALRead(&fake_xlogreader, &output_message.data[output_message.len], startptr, @@ -2767,8 +2764,6 @@ retry: sendSeg->ws_tli, /* Pass the current TLI because only * WalSndSegmentOpen controls whether new * TLI is needed. */ - sendSeg, - sendCxt, &errinfo)) WALReadRaiseError(&errinfo); diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index e29f65500f..d1a0678935 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -280,17 +280,15 @@ identify_target_directory(char *directory, char *fname) } /* pg_waldump's XLogReaderRoutine->segment_open callback */ -static int -WALDumpOpenSegment(XLogReaderState *state, - XLogSegNo nextSegNo, WALSegmentContext *segcxt, +static void +WALDumpOpenSegment(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p) { TimeLineID tli = *tli_p; char fname[MAXPGPATH]; - int fd; int tries; - XLogFileName(fname, tli, nextSegNo, segcxt->ws_segsize); + XLogFileName(fname, tli, nextSegNo, state->segcxt.ws_segsize); /* * In follow mode there is a short period of time after the server has @@ -300,9 +298,9 @@ WALDumpOpenSegment(XLogReaderState *state, */ for (tries = 0; tries < 10; tries++) { - fd = open_file_in_directory(segcxt->ws_dir, fname); - if (fd >= 0) - return fd; + state->seg.ws_file = open_file_in_directory(state->segcxt.ws_dir, fname); + if (state->seg.ws_file >= 0) + return; if (errno == ENOENT) { int save_errno = errno; @@ -318,7 +316,6 @@ WALDumpOpenSegment(XLogReaderState *state, } fatal_error("could not find file \"%s\": %m", fname); - return -1; /* keep compiler quiet */ } /* @@ -356,7 +353,6 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, } if (!WALRead(state, readBuff, targetPagePtr, count, private->timeline, - &state->seg, &state->segcxt, &errinfo)) { WALOpenSegment *seg = &errinfo.wre_seg; diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 80cf62acb7..c21b0ba972 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -63,10 +63,9 @@ typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader, int reqLen, XLogRecPtr targetRecPtr, char *readBuf); -typedef int (*WALSegmentOpenCB) (XLogReaderState *xlogreader, - XLogSegNo nextSegNo, - WALSegmentContext *segcxt, - TimeLineID *tli_p); +typedef void (*WALSegmentOpenCB) (XLogReaderState *xlogreader, + XLogSegNo nextSegNo, + TimeLineID *tli_p); typedef void (*WALSegmentCloseCB) (XLogReaderState *xlogreader); typedef struct XLogReaderRoutine @@ -94,21 +93,16 @@ typedef struct XLogReaderRoutine XLogPageReadCB page_read; /* - * Callback to open the specified WAL segment for reading. The file - * descriptor of the opened segment shall be returned. In case of + * Callback to open the specified WAL segment for reading. ->seg.ws_file + * shall be set to the file descriptor of the opened segment. In case of * failure, an error shall be raised by the callback and it shall not * return. * * "nextSegNo" is the number of the segment to be opened. * - * "segcxt" is additional information about the segment. - * * "tli_p" is an input/output argument. WALRead() uses it to pass the * timeline in which the new segment should be found, but the callback can * use it to return the TLI that it actually opened. - * - * BasicOpenFile() is the preferred way to open the segment file in - * backend code, whereas open(2) should be used in frontend. */ WALSegmentOpenCB segment_open; @@ -301,9 +295,7 @@ typedef struct WALReadError extern bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, - TimeLineID tli, WALOpenSegment *seg, - WALSegmentContext *segcxt, - WALReadError *errinfo); + TimeLineID tli, WALReadError *errinfo); /* Functions for decoding an XLogRecord */ diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index 68ce815476..e59b6cf3a9 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -50,9 +50,8 @@ extern void FreeFakeRelcacheEntry(Relation fakerel); extern int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page); -extern int wal_segment_open(XLogReaderState *state, +extern void wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, - WALSegmentContext *segcxt, TimeLineID *tli_p); extern void wal_segment_close(XLogReaderState *state);