Simon Riggs wrote:
> On Thu, 2010-03-18 at 23:27 +0900, Fujii Masao wrote:
>
>> I agree that this is a bigger problem. Since the standby always starts
>> walreceiver before replaying any WAL files in pg_xlog, walreceiver tries
>> to receive the WAL files following the REDO starting point even if they
>> have already been in pg_xlog. IOW, the same WAL files might be shipped
>> from the primary to the standby many times. This behavior is unsmart,
>> and should be addressed.
>
> We might also have written half a file many times. The files in pg_xlog
> are suspect whereas the files in the archive are not. If we have both we
> should prefer the archive.
Yep.
Here's a patch I've been playing with. The idea is that in standby mode,
the server keeps trying to make progress in the recovery by:
a) restoring files from archive
b) replaying files from pg_xlog
c) streaming from master
When recovery reaches an invalid WAL record, typically caused by a
half-written WAL file, it closes the file and moves to the next source.
If an error is found in a file restored from archive or in a portion
just streamed from master, however, a PANIC is thrown, because it's not
expected to have errors in the archive or in the master.
When a file is streamed from master, it's left in pg_xlog, so it's found
there after a standby restart, and recovery can progress to the same
point as before restart. It also means that you can copy partial WAL
files to pg_xlog at any time and have them replayed in a few seconds.
The code structure is a bit spaghetti-like, I'm afraid. Any suggestions
on how to improve that are welcome..
--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 445,464 **** static uint32 openLogSeg = 0;
static uint32 openLogOff = 0;
/*
* These variables are used similarly to the ones above, but for reading
* the XLOG. Note, however, that readOff generally represents the offset
* of the page just read, not the seek position of the FD itself, which
* will be just past that page. readLen indicates how much of the current
! * page has been read into readBuf.
*/
static int readFile = -1;
static uint32 readId = 0;
static uint32 readSeg = 0;
static uint32 readOff = 0;
static uint32 readLen = 0;
! /* Is the currently open segment being streamed from primary? */
! static bool readStreamed = false;
/* Buffer for currently read page (XLOG_BLCKSZ bytes) */
static char *readBuf = NULL;
--- 445,477 ----
static uint32 openLogOff = 0;
/*
+ * Codes indicating where we got a WAL file from during recovery, or where
+ * to attempt to get one.
+ */
+ #define XLOG_FROM_ARCHIVE (1<<0) /* Restored using restore_command */
+ #define XLOG_FROM_PG_XLOG (1<<1) /* Existing file in pg_xlog */
+ #define XLOG_FROM_STREAM (1<<2) /* Streamed from master */
+
+ /*
* These variables are used similarly to the ones above, but for reading
* the XLOG. Note, however, that readOff generally represents the offset
* of the page just read, not the seek position of the FD itself, which
* will be just past that page. readLen indicates how much of the current
! * page has been read into readBuf, and readSource indicates where we got
! * the currently open file from.
*/
static int readFile = -1;
static uint32 readId = 0;
static uint32 readSeg = 0;
static uint32 readOff = 0;
static uint32 readLen = 0;
+ static int readSource = 0; /* XLOG_FROM_* code */
! /*
! * Keeps track of which sources we've tried to read the current WAL
! * record from and failed.
! */
! static int failedSources = 0;
/* Buffer for currently read page (XLOG_BLCKSZ bytes) */
static char *readBuf = NULL;
***************
*** 512,520 **** static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
bool find_free, int *max_advance,
bool use_lock);
static int XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
! bool fromArchive, bool notexistOk);
static int XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode,
! bool fromArchive);
static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
bool randAccess);
static void XLogFileClose(void);
--- 525,533 ----
bool find_free, int *max_advance,
bool use_lock);
static int XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
! int source, bool notexistOk);
static int XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode,
! int sources);
static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
bool randAccess);
static void XLogFileClose(void);
***************
*** 2567,2573 **** XLogFileOpen(uint32 log, uint32 seg)
*/
static int
XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
! bool fromArchive, bool notfoundOk)
{
char xlogfname[MAXFNAMELEN];
char activitymsg[MAXFNAMELEN + 16];
--- 2580,2586 ----
*/
static int
XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
! int source, bool notfoundOk)
{
char xlogfname[MAXFNAMELEN];
char activitymsg[MAXFNAMELEN + 16];
***************
*** 2576,2598 **** XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
XLogFileName(xlogfname, tli, log, seg);
! if (fromArchive)
{
! /* Report recovery progress in PS display */
! snprintf(activitymsg, sizeof(activitymsg), "waiting for %s",
! xlogfname);
! set_ps_display(activitymsg, false);
! restoredFromArchive = RestoreArchivedFile(path, xlogfname,
! "RECOVERYXLOG",
! XLogSegSize);
! if (!restoredFromArchive)
! return -1;
! }
! else
! {
! XLogFilePath(path, tli, log, seg);
! restoredFromArchive = false;
}
fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
--- 2589,2616 ----
XLogFileName(xlogfname, tli, log, seg);
! switch (source)
{
! case XLOG_FROM_ARCHIVE:
! /* Report recovery progress in PS display */
! snprintf(activitymsg, sizeof(activitymsg), "waiting for %s",
! xlogfname);
! set_ps_display(activitymsg, false);
! restoredFromArchive = RestoreArchivedFile(path, xlogfname,
! "RECOVERYXLOG",
! XLogSegSize);
! if (!restoredFromArchive)
! return -1;
! break;
!
! case XLOG_FROM_PG_XLOG:
! XLogFilePath(path, tli, log, seg);
! restoredFromArchive = false;
! break;
!
! default:
! elog(ERROR, "invalid XLogFileRead source %d", source);
}
fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
***************
*** 2606,2611 **** XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
--- 2624,2631 ----
xlogfname);
set_ps_display(activitymsg, false);
+ readSource = source;
+
return fd;
}
if (errno != ENOENT || !notfoundOk) /* unexpected failure? */
***************
*** 2624,2630 **** XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
* searched in pg_xlog if not found in archive.
*/
static int
! XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode, bool fromArchive)
{
char path[MAXPGPATH];
ListCell *cell;
--- 2644,2650 ----
* searched in pg_xlog if not found in archive.
*/
static int
! XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode, int sources)
{
char path[MAXPGPATH];
ListCell *cell;
***************
*** 2647,2666 **** XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode, bool fromArchive)
if (tli < curFileTLI)
break; /* don't bother looking at too-old TLIs */
! fd = XLogFileRead(log, seg, emode, tli, fromArchive, true);
! if (fd != -1)
! return fd;
! /*
! * If not in StandbyMode, fall back to searching pg_xlog. In
! * StandbyMode we're streaming segments from the primary to pg_xlog,
! * and we mustn't confuse the (possibly partial) segments in pg_xlog
! * with complete segments ready to be applied. We rather wait for the
! * records to arrive through streaming.
! */
! if (!StandbyMode && fromArchive)
{
! fd = XLogFileRead(log, seg, emode, tli, false, true);
if (fd != -1)
return fd;
}
--- 2667,2685 ----
if (tli < curFileTLI)
break; /* don't bother looking at too-old TLIs */
! if (sources & XLOG_FROM_ARCHIVE)
! {
! fd = XLogFileRead(log, seg, emode, tli, XLOG_FROM_ARCHIVE, true);
! if (fd != -1)
! {
! elog(DEBUG1, "got WAL segment from archive");
! return fd;
! }
! }
! if (sources & XLOG_FROM_PG_XLOG)
{
! fd = XLogFileRead(log, seg, emode, tli, XLOG_FROM_PG_XLOG, true);
if (fd != -1)
return fd;
}
***************
*** 3530,3545 **** ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt)
uint32 pageHeaderSize;
int emode;
- /*
- * We don't expect any invalid records during streaming recovery: we
- * should never hit the end of WAL because we wait for it to be streamed.
- * Therefore treat any broken WAL as PANIC, instead of failing over.
- */
- if (StandbyMode)
- emode = PANIC;
- else
- emode = emode_arg;
-
if (readBuf == NULL)
{
/*
--- 3549,3554 ----
***************
*** 3591,3600 **** ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt)
randAccess = true; /* allow curFileTLI to go backwards too */
}
/* Read the page containing the record */
! if (!XLogPageRead(RecPtr, emode, fetching_ckpt, randAccess))
return NULL;
pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;
if (targetRecOff == 0)
--- 3600,3623 ----
randAccess = true; /* allow curFileTLI to go backwards too */
}
+ /* This is the first try read this page. */
+ failedSources = 0;
+ retry:
/* Read the page containing the record */
! if (!XLogPageRead(RecPtr, emode_arg, fetching_ckpt, randAccess))
return NULL;
+ /*
+ * We don't expect any invalid records in archive or in records streamed
+ * from master: we should never hit the end of WAL because we wait for it
+ * to be streamed. Therefore treat any broken WAL as PANIC, instead of
+ * failing over.
+ */
+ if (readSource == XLOG_FROM_STREAM || readSource == XLOG_FROM_ARCHIVE)
+ emode = PANIC;
+ else
+ emode = emode_arg;
+
pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;
if (targetRecOff == 0)
***************
*** 3828,3833 **** next_record_is_invalid:;
--- 3851,3864 ----
close(readFile);
readFile = -1;
}
+
+ /* In standby-mode, retry from another source */
+ if (StandbyMode)
+ {
+ failedSources |= readSource;
+ goto retry;
+ }
+
return NULL;
}
***************
*** 8698,8704 **** StartupProcessMain(void)
* as for waiting for the requested WAL record to arrive in standby mode.
*/
static bool
! XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
bool randAccess)
{
static XLogRecPtr receivedUpto = {0, 0};
--- 8729,8735 ----
* as for waiting for the requested WAL record to arrive in standby mode.
*/
static bool
! XLogPageRead(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt,
bool randAccess)
{
static XLogRecPtr receivedUpto = {0, 0};
***************
*** 8707,8719 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
uint32 targetRecOff;
uint32 targetId;
uint32 targetSeg;
XLByteToSeg(*RecPtr, targetId, targetSeg);
targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;
/* Fast exit if we have read the record in the current buffer already */
! if (targetId == readId && targetSeg == readSeg &&
targetPageOff == readOff && targetRecOff < readLen)
return true;
--- 8738,8752 ----
uint32 targetRecOff;
uint32 targetId;
uint32 targetSeg;
+ int emode;
+ static pg_time_t last_fail_time = 0;
XLByteToSeg(*RecPtr, targetId, targetSeg);
targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;
/* Fast exit if we have read the record in the current buffer already */
! if (failedSources == 0 && targetId == readId && targetSeg == readSeg &&
targetPageOff == readOff && targetRecOff < readLen)
return true;
***************
*** 8725,8742 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
{
close(readFile);
readFile = -1;
}
XLByteToSeg(*RecPtr, readId, readSeg);
/* See if we need to retrieve more data */
if (readFile < 0 ||
! (readStreamed && !XLByteLT(*RecPtr, receivedUpto)))
{
if (StandbyMode)
{
- bool last_restore_failed = false;
-
/*
* In standby mode, wait for the requested record to become
* available, either via restore_command succeeding to restore the
--- 8758,8775 ----
{
close(readFile);
readFile = -1;
+ readSource = 0;
}
XLByteToSeg(*RecPtr, readId, readSeg);
+ retry:
/* See if we need to retrieve more data */
if (readFile < 0 ||
! (readSource == XLOG_FROM_STREAM && !XLByteLT(*RecPtr, receivedUpto)))
{
if (StandbyMode)
{
/*
* In standby mode, wait for the requested record to become
* available, either via restore_command succeeding to restore the
***************
*** 8746,8751 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
--- 8779,8786 ----
{
if (WalRcvInProgress())
{
+ failedSources = 0;
+
/*
* While walreceiver is active, wait for new WAL to arrive
* from primary.
***************
*** 8761,8775 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
{
readFile =
XLogFileRead(readId, readSeg, PANIC,
! recoveryTargetTLI, false, false);
switched_segment = true;
! readStreamed = true;
}
break;
}
if (CheckForStandbyTrigger())
! goto next_record_is_invalid;
/*
* When streaming is active, we want to react quickly when
--- 8796,8811 ----
{
readFile =
XLogFileRead(readId, readSeg, PANIC,
! recoveryTargetTLI,
! XLOG_FROM_PG_XLOG, false);
switched_segment = true;
! readSource = XLOG_FROM_STREAM;
}
break;
}
if (CheckForStandbyTrigger())
! goto triggered;
/*
* When streaming is active, we want to react quickly when
***************
*** 8779,8784 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
--- 8815,8823 ----
}
else
{
+ int sources;
+ pg_time_t now;
+
/*
* Until walreceiver manages to reconnect, poll the
* archive.
***************
*** 8791,8828 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
/* Reset curFileTLI if random fetch. */
if (randAccess)
curFileTLI = 0;
! readFile = XLogFileReadAnyTLI(readId, readSeg, DEBUG2, true);
switched_segment = true;
- readStreamed = false;
if (readFile != -1)
{
- elog(DEBUG1, "got WAL segment from archive");
break;
}
/*
! * If we succeeded restoring some segments from archive
! * since the last connection attempt (or we haven't tried
! * streaming yet, retry immediately. But if we haven't,
! * assume the problem is persistent, so be less
! * aggressive.
*/
! if (last_restore_failed)
{
! /*
! * Check to see if the trigger file exists. Note that
! * we do this only after failure, so when you create
! * the trigger file, we still finish replaying as much
! * as we can before failover.
! */
! if (CheckForStandbyTrigger())
! goto next_record_is_invalid;
! pg_usleep(5000000L); /* 5 seconds */
}
! last_restore_failed = true;
/*
! * Nope, not found in archive. Try to stream it.
*
* If fetching_ckpt is TRUE, RecPtr points to the initial
* checkpoint location. In that case, we use RedoStartLSN
--- 8830,8877 ----
/* Reset curFileTLI if random fetch. */
if (randAccess)
curFileTLI = 0;
!
! /*
! * Try to restore the file from archive, or read an
! * existing file from pg_xlog.
! */
! sources = XLOG_FROM_ARCHIVE | XLOG_FROM_PG_XLOG;
! sources &= ~failedSources;
! readFile = XLogFileReadAnyTLI(readId, readSeg, DEBUG2,
! sources);
switched_segment = true;
if (readFile != -1)
{
break;
}
/*
! * Nope, not found in archive.
! */
!
! /*
! * Check to see if the trigger file exists. Note that
! * we do this only after failure, so when you create
! * the trigger file, we still finish replaying as much
! * as we can from archive and pg_xlog before failover.
*/
! if (CheckForStandbyTrigger())
! goto triggered;
!
! /*
! * Sleep if it hasn't been long since last attempt.
! */
! now = (pg_time_t) time(NULL);
! if ((now - last_fail_time) < 5)
{
! pg_usleep(1000000L * (5 - (now - last_fail_time)));
! now = (pg_time_t) time(NULL);
}
! last_fail_time = now;
/*
! * If primary_conninfo is set, launch walreceiver to
! * try to stream the missing WAL.
*
* If fetching_ckpt is TRUE, RecPtr points to the initial
* checkpoint location. In that case, we use RedoStartLSN
***************
*** 8847,8859 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
/* In archive or crash recovery. */
if (readFile < 0)
{
/* Reset curFileTLI if random fetch. */
if (randAccess)
curFileTLI = 0;
! readFile = XLogFileReadAnyTLI(readId, readSeg, emode,
! InArchiveRecovery);
switched_segment = true;
- readStreamed = false;
if (readFile < 0)
return false;
}
--- 8896,8914 ----
/* In archive or crash recovery. */
if (readFile < 0)
{
+ int sources;
/* Reset curFileTLI if random fetch. */
if (randAccess)
curFileTLI = 0;
!
! sources = XLOG_FROM_PG_XLOG;
! if (InArchiveRecovery)
! sources |= XLOG_FROM_ARCHIVE;
! sources &= ~failedSources;
!
! readFile = XLogFileReadAnyTLI(readId, readSeg, emode_arg,
! sources);
switched_segment = true;
if (readFile < 0)
return false;
}
***************
*** 8861,8878 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
}
/*
! * At this point, we have the right segment open and we know the requested
! * record is in it.
*/
Assert(readFile != -1);
/*
* If the current segment is being streamed from master, calculate how
* much of the current page we have received already. We know the
* requested record has been received, but this is for the benefit of
* future calls, to allow quick exit at the top of this function.
*/
! if (readStreamed)
{
if (RecPtr->xlogid != receivedUpto.xlogid ||
(RecPtr->xrecoff / XLOG_BLCKSZ) != (receivedUpto.xrecoff / XLOG_BLCKSZ))
--- 8916,8944 ----
}
/*
! * At this point, we have the right segment open and if we're streaming
! * we know the requested record is in it.
*/
Assert(readFile != -1);
/*
+ * We don't expect any invalid records in archive or in records streamed
+ * from master: we should never hit the end of WAL because we wait for it
+ * to be streamed. Therefore treat any broken WAL as PANIC, instead of
+ * failing over.
+ */
+ if (readSource == XLOG_FROM_STREAM || readSource == XLOG_FROM_ARCHIVE)
+ emode = PANIC;
+ else
+ emode = emode_arg;
+
+ /*
* If the current segment is being streamed from master, calculate how
* much of the current page we have received already. We know the
* requested record has been received, but this is for the benefit of
* future calls, to allow quick exit at the top of this function.
*/
! if (readSource == XLOG_FROM_STREAM)
{
if (RecPtr->xlogid != receivedUpto.xlogid ||
(RecPtr->xrecoff / XLOG_BLCKSZ) != (receivedUpto.xrecoff / XLOG_BLCKSZ))
***************
*** 8936,8946 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
return true;
next_record_is_invalid:
if (readFile >= 0)
close(readFile);
readFile = -1;
- readStreamed = false;
readLen = 0;
return false;
}
--- 9002,9026 ----
return true;
next_record_is_invalid:
+ failedSources |= readSource;
+
+ if (readFile >= 0)
+ close(readFile);
+ readFile = -1;
+ readLen = 0;
+ readSource = 0;
+
+ if (StandbyMode)
+ goto retry;
+ else
+ return false;
+
+ triggered:
if (readFile >= 0)
close(readFile);
readFile = -1;
readLen = 0;
+ readSource = 0;
return false;
}
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers