Hello,
At Thu, 19 Jan 2017 18:37:31 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI
<[email protected]> wrote in
<[email protected]>
> > > - Delaying recycling a segment until the last partial record on it
> > > completes. This seems doable in page-wise (coarse resolution)
> > > but would cost additional reading of past xlog files (page
> > > header of past pages is required).
> >
> > Hm, yes. That looks like the least invasive way to go. At least that
> > looks more correct than the others.
>
> The attached patch does that. Usually it reads page headers only
> on segment boundaries, but once continuation record found (or
> failed to read the next page header, that is, the first record on
> the first page in the next segment has not been replicated), it
> becomes to happen on every page boundary until non-continuation
> page comes.
>
> I leave a debug info (at LOG level) in the attached file shown on
> every state change of keep pointer. At least for pgbench, the
> cost seems ignorable.
I revised it. It became neater and less invasive.
- Removed added keep from struct WalSnd. It is never referrenced
from other processes. It is static variable now.
- Restore keepPtr from replication slot on starting.
- Moved the main part to more appropriate position.
regards,
--
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index f3082c3..0270474 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -185,6 +185,12 @@ static volatile sig_atomic_t replication_active = false;
static LogicalDecodingContext *logical_decoding_ctx = NULL;
static XLogRecPtr logical_startptr = InvalidXLogRecPtr;
+/*
+ * Segment keep pointer for physical slots. Has a valid value only when it
+ * differs from the current flush pointer.
+ */
+static XLogRecPtr keepPtr = InvalidXLogRecPtr;
+
/* Signal handlers */
static void WalSndSigHupHandler(SIGNAL_ARGS);
static void WalSndXLogSendHandler(SIGNAL_ARGS);
@@ -217,7 +223,7 @@ static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, Tran
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
-static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
+static bool XLogRead(char *buf, XLogRecPtr startptr, Size count, bool noutfoundok);
/* Initialize walsender process before entering the main command loop */
@@ -538,6 +544,9 @@ StartReplication(StartReplicationCmd *cmd)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
(errmsg("cannot use a logical replication slot for physical replication"))));
+
+ /* Restore keepPtr from replication slot */
+ keepPtr = MyReplicationSlot->data.restart_lsn;
}
/*
@@ -553,6 +562,10 @@ StartReplication(StartReplicationCmd *cmd)
else
FlushPtr = GetFlushRecPtr();
+ /* Set InvalidXLogRecPtr if catching up */
+ if (keepPtr == FlushPtr)
+ keepPtr = InvalidXLogRecPtr;
+
if (cmd->timeline != 0)
{
XLogRecPtr switchpoint;
@@ -774,7 +787,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
count = flushptr - targetPagePtr;
/* now actually read the data, we know it's there */
- XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
+ XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ, false);
return count;
}
@@ -1551,7 +1564,7 @@ static void
ProcessStandbyReplyMessage(void)
{
XLogRecPtr writePtr,
- flushPtr,
+ flushPtr, oldFlushPtr,
applyPtr;
bool replyRequested;
@@ -1580,6 +1593,7 @@ ProcessStandbyReplyMessage(void)
WalSnd *walsnd = MyWalSnd;
SpinLockAcquire(&walsnd->mutex);
+ oldFlushPtr = walsnd->flush;
walsnd->write = writePtr;
walsnd->flush = flushPtr;
walsnd->apply = applyPtr;
@@ -1597,7 +1611,78 @@ ProcessStandbyReplyMessage(void)
if (SlotIsLogical(MyReplicationSlot))
LogicalConfirmReceivedLocation(flushPtr);
else
- PhysicalConfirmReceivedLocation(flushPtr);
+ {
+ /*
+ * On recovery, a continuation reocrd must be available from
+ * single WAL source. So physical replication slot should stay in
+ * the first segment for a continuation record spanning multiple
+ * segments. Since this doesn't look into individual record,
+ * keepPtr may stay a bit too behind.
+ *
+ * Since the objective is avoding to remove required segments,
+ * checking every segment is enough. But once keepPtr goes behind,
+ * check every page for quick restoration.
+ *
+ * keepPtr has a valid value only when it is behind flushPtr.
+ */
+ if (oldFlushPtr != InvalidXLogRecPtr &&
+ (keepPtr == InvalidXLogRecPtr ?
+ oldFlushPtr / XLOG_SEG_SIZE != flushPtr / XLOG_SEG_SIZE :
+ keepPtr / XLOG_BLCKSZ != flushPtr / XLOG_BLCKSZ))
+ {
+ XLogRecPtr rp;
+ XLogRecPtr oldKeepPtr = keepPtr; /* for debug */
+
+ if (keepPtr == InvalidXLogRecPtr)
+ keepPtr = oldFlushPtr;
+
+ rp = keepPtr - (keepPtr % XLOG_BLCKSZ);
+
+ /*
+ * We may have let the record at flushPtr sent, so it's worth
+ * looking
+ */
+ while (rp <= flushPtr)
+ {
+ XLogPageHeaderData header;
+
+ /*
+ * If the page header is not available for now, don't move
+ * keepPtr forward. We can read it by the next chance.
+ */
+ if(sentPtr - rp >= sizeof(XLogPageHeaderData))
+ {
+ bool found;
+ /*
+ * Fetch the page header of the next page. Move
+ * keepPtr forward only if when it is not a
+ * continuation page.
+ */
+ found = XLogRead((char *)&header, rp,
+ sizeof(XLogPageHeaderData), true);
+ if (found &&
+ (header.xlp_info & XLP_FIRST_IS_CONTRECORD) == 0)
+ keepPtr = rp;
+ }
+ rp += XLOG_BLCKSZ;
+ }
+
+ /*
+ * If keepPtr is on the same page with flushPtr, it means that
+ * we are catching up
+ */
+ if (keepPtr / XLOG_BLCKSZ == flushPtr / XLOG_BLCKSZ)
+ keepPtr = InvalidXLogRecPtr;
+
+ if (oldKeepPtr != keepPtr)
+ elog(LOG, "%lX => %lX / %lX",
+ oldKeepPtr, keepPtr, flushPtr);
+ }
+
+ /* keepPtr == InvalidXLogRecPtr means catching up */
+ PhysicalConfirmReceivedLocation(keepPtr != InvalidXLogRecPtr ?
+ keepPtr : flushPtr);
+ }
}
}
@@ -2019,6 +2104,7 @@ WalSndKill(int code, Datum arg)
/*
* Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
+ * Returns false if the segment file is not found when notfoundok is true.
*
* XXX probably this should be improved to suck data directly from the
* WAL buffers when possible.
@@ -2028,8 +2114,8 @@ WalSndKill(int code, Datum arg)
* always be one descriptor left open until the process ends, but never
* more than one.
*/
-static void
-XLogRead(char *buf, XLogRecPtr startptr, Size count)
+static bool
+XLogRead(char *buf, XLogRecPtr startptr, Size count, bool notfoundok)
{
char *p;
XLogRecPtr recptr;
@@ -2106,10 +2192,15 @@ retry:
* removed or recycled.
*/
if (errno == ENOENT)
+ {
+ if (notfoundok)
+ return false;
+
ereport(ERROR,
(errcode_for_file_access(),
errmsg("requested WAL segment %s has already been removed",
XLogFileNameP(curFileTimeLine, sendSegNo))));
+ }
else
ereport(ERROR,
(errcode_for_file_access(),
@@ -2189,6 +2280,8 @@ retry:
goto retry;
}
}
+
+ return true;
}
/*
@@ -2393,7 +2486,7 @@ XLogSendPhysical(void)
* calls.
*/
enlargeStringInfo(&output_message, nbytes);
- XLogRead(&output_message.data[output_message.len], startptr, nbytes);
+ XLogRead(&output_message.data[output_message.len], startptr, nbytes, false);
output_message.len += nbytes;
output_message.data[output_message.len] = '\0';
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers