Simon Riggs wrote:
> On Thu, 2010-03-18 at 23:27 +0900, Fujii Masao wrote:
> 
>> I agree that this is a bigger problem. Since the standby always starts
>> walreceiver before replaying any WAL files in pg_xlog, walreceiver tries
>> to receive the WAL files following the REDO starting point even if they
>> have already been in pg_xlog. IOW, the same WAL files might be shipped
>> from the primary to the standby many times. This behavior is unsmart,
>> and should be addressed.
> 
> We might also have written half a file many times. The files in pg_xlog
> are suspect whereas the files in the archive are not. If we have both we
> should prefer the archive.

Yep.

Here's a patch I've been playing with. The idea is that in standby mode,
the server keeps trying to make progress in the recovery by:

a) restoring files from archive
b) replaying files from pg_xlog
c) streaming from master

When recovery reaches an invalid WAL record, typically caused by a
half-written WAL file, it closes the file and moves to the next source.
If an error is found in a file restored from archive or in a portion
just streamed from master, however, a PANIC is thrown, because it's not
expected to have errors in the archive or in the master.

When a file is streamed from master, it's left in pg_xlog, so it's found
there after a standby restart, and recovery can progress to the same
point as before restart. It also means that you can copy partial WAL
files to pg_xlog at any time and have them replayed in a few seconds.

The code structure is a bit spaghetti-like, I'm afraid. Any suggestions
on how to improve that are welcome..

-- 
  Heikki Linnakangas
  EnterpriseDB   http://www.enterprisedb.com
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 445,464 **** static uint32 openLogSeg = 0;
  static uint32 openLogOff = 0;
  
  /*
   * These variables are used similarly to the ones above, but for reading
   * the XLOG.  Note, however, that readOff generally represents the offset
   * of the page just read, not the seek position of the FD itself, which
   * will be just past that page. readLen indicates how much of the current
!  * page has been read into readBuf.
   */
  static int	readFile = -1;
  static uint32 readId = 0;
  static uint32 readSeg = 0;
  static uint32 readOff = 0;
  static uint32 readLen = 0;
  
! /* Is the currently open segment being streamed from primary? */
! static bool readStreamed = false;
  
  /* Buffer for currently read page (XLOG_BLCKSZ bytes) */
  static char *readBuf = NULL;
--- 445,477 ----
  static uint32 openLogOff = 0;
  
  /*
+  * Codes indicating where we got a WAL file from during recovery, or where
+  * to attempt to get one.
+  */
+ #define XLOG_FROM_ARCHIVE		(1<<0)	/* Restored using restore_command */
+ #define XLOG_FROM_PG_XLOG		(1<<1)	/* Existing file in pg_xlog */
+ #define XLOG_FROM_STREAM		(1<<2)	/* Streamed from master */
+ 
+ /*
   * These variables are used similarly to the ones above, but for reading
   * the XLOG.  Note, however, that readOff generally represents the offset
   * of the page just read, not the seek position of the FD itself, which
   * will be just past that page. readLen indicates how much of the current
!  * page has been read into readBuf, and readSource indicates where we got
!  * the currently open file from.
   */
  static int	readFile = -1;
  static uint32 readId = 0;
  static uint32 readSeg = 0;
  static uint32 readOff = 0;
  static uint32 readLen = 0;
+ static int readSource = 0;		/* XLOG_FROM_* code */
  
! /*
!  * Keeps track of which sources we've tried to read the current WAL
!  * record from and failed.
!  */
! static int failedSources = 0;
  
  /* Buffer for currently read page (XLOG_BLCKSZ bytes) */
  static char *readBuf = NULL;
***************
*** 512,520 **** static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
  					   bool find_free, int *max_advance,
  					   bool use_lock);
  static int XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
! 			 bool fromArchive, bool notexistOk);
  static int XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode,
! 				   bool fromArchive);
  static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
  			 bool randAccess);
  static void XLogFileClose(void);
--- 525,533 ----
  					   bool find_free, int *max_advance,
  					   bool use_lock);
  static int XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
! 			 int source, bool notexistOk);
  static int XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode,
! 				   int sources);
  static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
  			 bool randAccess);
  static void XLogFileClose(void);
***************
*** 2567,2573 **** XLogFileOpen(uint32 log, uint32 seg)
   */
  static int
  XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
! 			 bool fromArchive, bool notfoundOk)
  {
  	char		xlogfname[MAXFNAMELEN];
  	char		activitymsg[MAXFNAMELEN + 16];
--- 2580,2586 ----
   */
  static int
  XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
! 			 int source, bool notfoundOk)
  {
  	char		xlogfname[MAXFNAMELEN];
  	char		activitymsg[MAXFNAMELEN + 16];
***************
*** 2576,2598 **** XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
  
  	XLogFileName(xlogfname, tli, log, seg);
  
! 	if (fromArchive)
  	{
! 		/* Report recovery progress in PS display */
! 		snprintf(activitymsg, sizeof(activitymsg), "waiting for %s",
! 				 xlogfname);
! 		set_ps_display(activitymsg, false);
  
! 		restoredFromArchive = RestoreArchivedFile(path, xlogfname,
! 												  "RECOVERYXLOG",
! 												  XLogSegSize);
! 		if (!restoredFromArchive)
! 			return -1;
! 	}
! 	else
! 	{
! 		XLogFilePath(path, tli, log, seg);
! 		restoredFromArchive = false;
  	}
  
  	fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
--- 2589,2616 ----
  
  	XLogFileName(xlogfname, tli, log, seg);
  
! 	switch (source)
  	{
! 		case XLOG_FROM_ARCHIVE:
! 			/* Report recovery progress in PS display */
! 			snprintf(activitymsg, sizeof(activitymsg), "waiting for %s",
! 					 xlogfname);
! 			set_ps_display(activitymsg, false);
  
! 			restoredFromArchive = RestoreArchivedFile(path, xlogfname,
! 													  "RECOVERYXLOG",
! 													  XLogSegSize);
! 			if (!restoredFromArchive)
! 				return -1;
! 			break;
! 
! 		case XLOG_FROM_PG_XLOG:
! 			XLogFilePath(path, tli, log, seg);
! 			restoredFromArchive = false;
! 			break;
! 
! 		default:
! 			elog(ERROR, "invalid XLogFileRead source %d", source);
  	}
  
  	fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
***************
*** 2606,2611 **** XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
--- 2624,2631 ----
  				 xlogfname);
  		set_ps_display(activitymsg, false);
  
+ 		readSource = source;
+ 
  		return fd;
  	}
  	if (errno != ENOENT || !notfoundOk) /* unexpected failure? */
***************
*** 2624,2630 **** XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
   * searched in pg_xlog if not found in archive.
   */
  static int
! XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode, bool fromArchive)
  {
  	char		path[MAXPGPATH];
  	ListCell   *cell;
--- 2644,2650 ----
   * searched in pg_xlog if not found in archive.
   */
  static int
! XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode, int sources)
  {
  	char		path[MAXPGPATH];
  	ListCell   *cell;
***************
*** 2647,2666 **** XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode, bool fromArchive)
  		if (tli < curFileTLI)
  			break;				/* don't bother looking at too-old TLIs */
  
! 		fd = XLogFileRead(log, seg, emode, tli, fromArchive, true);
! 		if (fd != -1)
! 			return fd;
  
! 		/*
! 		 * If not in StandbyMode, fall back to searching pg_xlog. In
! 		 * StandbyMode we're streaming segments from the primary to pg_xlog,
! 		 * and we mustn't confuse the (possibly partial) segments in pg_xlog
! 		 * with complete segments ready to be applied. We rather wait for the
! 		 * records to arrive through streaming.
! 		 */
! 		if (!StandbyMode && fromArchive)
  		{
! 			fd = XLogFileRead(log, seg, emode, tli, false, true);
  			if (fd != -1)
  				return fd;
  		}
--- 2667,2685 ----
  		if (tli < curFileTLI)
  			break;				/* don't bother looking at too-old TLIs */
  
! 		if (sources & XLOG_FROM_ARCHIVE)
! 		{
! 			fd = XLogFileRead(log, seg, emode, tli, XLOG_FROM_ARCHIVE, true);
! 			if (fd != -1)
! 			{
! 				elog(DEBUG1, "got WAL segment from archive");
! 				return fd;
! 			}
! 		}
  
! 		if (sources & XLOG_FROM_PG_XLOG)
  		{
! 			fd = XLogFileRead(log, seg, emode, tli, XLOG_FROM_PG_XLOG, true);
  			if (fd != -1)
  				return fd;
  		}
***************
*** 3530,3545 **** ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt)
  	uint32		pageHeaderSize;
  	int			emode;
  
- 	/*
- 	 * We don't expect any invalid records during streaming recovery: we
- 	 * should never hit the end of WAL because we wait for it to be streamed.
- 	 * Therefore treat any broken WAL as PANIC, instead of failing over.
- 	 */
- 	if (StandbyMode)
- 		emode = PANIC;
- 	else
- 		emode = emode_arg;
- 
  	if (readBuf == NULL)
  	{
  		/*
--- 3549,3554 ----
***************
*** 3591,3600 **** ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt)
  		randAccess = true;		/* allow curFileTLI to go backwards too */
  	}
  
  	/* Read the page containing the record */
! 	if (!XLogPageRead(RecPtr, emode, fetching_ckpt, randAccess))
  		return NULL;
  
  	pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
  	targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;
  	if (targetRecOff == 0)
--- 3600,3623 ----
  		randAccess = true;		/* allow curFileTLI to go backwards too */
  	}
  
+ 	/* This is the first try read this page. */
+ 	failedSources = 0;
+ retry:
  	/* Read the page containing the record */
! 	if (!XLogPageRead(RecPtr, emode_arg, fetching_ckpt, randAccess))
  		return NULL;
  
+ 	/*
+ 	 * We don't expect any invalid records in archive or in records streamed
+ 	 * from master: we should never hit the end of WAL because we wait for it
+ 	 * to be streamed. Therefore treat any broken WAL as PANIC, instead of
+ 	 * failing over.
+ 	 */
+ 	if (readSource == XLOG_FROM_STREAM || readSource == XLOG_FROM_ARCHIVE)
+ 		emode = PANIC;
+ 	else
+ 		emode = emode_arg;
+ 
  	pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
  	targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;
  	if (targetRecOff == 0)
***************
*** 3828,3833 **** next_record_is_invalid:;
--- 3851,3864 ----
  		close(readFile);
  		readFile = -1;
  	}
+ 
+ 	/* In standby-mode, retry from another source */
+ 	if (StandbyMode)
+ 	{
+ 		failedSources |= readSource;
+ 		goto retry;
+ 	}
+ 
  	return NULL;
  }
  
***************
*** 8698,8704 **** StartupProcessMain(void)
   * as for waiting for the requested WAL record to arrive in standby mode.
   */
  static bool
! XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
  			 bool randAccess)
  {
  	static XLogRecPtr receivedUpto = {0, 0};
--- 8729,8735 ----
   * as for waiting for the requested WAL record to arrive in standby mode.
   */
  static bool
! XLogPageRead(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt,
  			 bool randAccess)
  {
  	static XLogRecPtr receivedUpto = {0, 0};
***************
*** 8707,8719 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
  	uint32		targetRecOff;
  	uint32		targetId;
  	uint32		targetSeg;
  
  	XLByteToSeg(*RecPtr, targetId, targetSeg);
  	targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
  	targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;
  
  	/* Fast exit if we have read the record in the current buffer already */
! 	if (targetId == readId && targetSeg == readSeg &&
  		targetPageOff == readOff && targetRecOff < readLen)
  		return true;
  
--- 8738,8752 ----
  	uint32		targetRecOff;
  	uint32		targetId;
  	uint32		targetSeg;
+ 	int			emode;
+ 	static pg_time_t last_fail_time = 0;
  
  	XLByteToSeg(*RecPtr, targetId, targetSeg);
  	targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
  	targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;
  
  	/* Fast exit if we have read the record in the current buffer already */
! 	if (failedSources == 0 && targetId == readId && targetSeg == readSeg &&
  		targetPageOff == readOff && targetRecOff < readLen)
  		return true;
  
***************
*** 8725,8742 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
  	{
  		close(readFile);
  		readFile = -1;
  	}
  
  	XLByteToSeg(*RecPtr, readId, readSeg);
  
  	/* See if we need to retrieve more data */
  	if (readFile < 0 ||
! 		(readStreamed && !XLByteLT(*RecPtr, receivedUpto)))
  	{
  		if (StandbyMode)
  		{
- 			bool		last_restore_failed = false;
- 
  			/*
  			 * In standby mode, wait for the requested record to become
  			 * available, either via restore_command succeeding to restore the
--- 8758,8775 ----
  	{
  		close(readFile);
  		readFile = -1;
+ 		readSource = 0;
  	}
  
  	XLByteToSeg(*RecPtr, readId, readSeg);
  
+ retry:
  	/* See if we need to retrieve more data */
  	if (readFile < 0 ||
! 		(readSource == XLOG_FROM_STREAM && !XLByteLT(*RecPtr, receivedUpto)))
  	{
  		if (StandbyMode)
  		{
  			/*
  			 * In standby mode, wait for the requested record to become
  			 * available, either via restore_command succeeding to restore the
***************
*** 8746,8751 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
--- 8779,8786 ----
  			{
  				if (WalRcvInProgress())
  				{
+ 					failedSources = 0;
+ 
  					/*
  					 * While walreceiver is active, wait for new WAL to arrive
  					 * from primary.
***************
*** 8761,8775 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
  						{
  							readFile =
  								XLogFileRead(readId, readSeg, PANIC,
! 											 recoveryTargetTLI, false, false);
  							switched_segment = true;
! 							readStreamed = true;
  						}
  						break;
  					}
  
  					if (CheckForStandbyTrigger())
! 						goto next_record_is_invalid;
  
  					/*
  					 * When streaming is active, we want to react quickly when
--- 8796,8811 ----
  						{
  							readFile =
  								XLogFileRead(readId, readSeg, PANIC,
! 											 recoveryTargetTLI,
! 											 XLOG_FROM_PG_XLOG, false);
  							switched_segment = true;
! 							readSource = XLOG_FROM_STREAM;
  						}
  						break;
  					}
  
  					if (CheckForStandbyTrigger())
! 						goto triggered;
  
  					/*
  					 * When streaming is active, we want to react quickly when
***************
*** 8779,8784 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
--- 8815,8823 ----
  				}
  				else
  				{
+ 					int sources;
+ 					pg_time_t now;
+ 
  					/*
  					 * Until walreceiver manages to reconnect, poll the
  					 * archive.
***************
*** 8791,8828 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
  					/* Reset curFileTLI if random fetch. */
  					if (randAccess)
  						curFileTLI = 0;
! 					readFile = XLogFileReadAnyTLI(readId, readSeg, DEBUG2, true);
  					switched_segment = true;
- 					readStreamed = false;
  					if (readFile != -1)
  					{
- 						elog(DEBUG1, "got WAL segment from archive");
  						break;
  					}
  
  					/*
! 					 * If we succeeded restoring some segments from archive
! 					 * since the last connection attempt (or we haven't tried
! 					 * streaming yet, retry immediately. But if we haven't,
! 					 * assume the problem is persistent, so be less
! 					 * aggressive.
  					 */
! 					if (last_restore_failed)
  					{
! 						/*
! 						 * Check to see if the trigger file exists. Note that
! 						 * we do this only after failure, so when you create
! 						 * the trigger file, we still finish replaying as much
! 						 * as we can before failover.
! 						 */
! 						if (CheckForStandbyTrigger())
! 							goto next_record_is_invalid;
! 						pg_usleep(5000000L);	/* 5 seconds */
  					}
! 					last_restore_failed = true;
  
  					/*
! 					 * Nope, not found in archive. Try to stream it.
  					 *
  					 * If fetching_ckpt is TRUE, RecPtr points to the initial
  					 * checkpoint location. In that case, we use RedoStartLSN
--- 8830,8877 ----
  					/* Reset curFileTLI if random fetch. */
  					if (randAccess)
  						curFileTLI = 0;
! 
! 					/*
! 					 * Try to restore the file from archive, or read an
! 					 * existing file from pg_xlog.
! 					 */
! 					sources = XLOG_FROM_ARCHIVE | XLOG_FROM_PG_XLOG;
! 					sources &= ~failedSources;
! 					readFile = XLogFileReadAnyTLI(readId, readSeg, DEBUG2,
! 												  sources);
  					switched_segment = true;
  					if (readFile != -1)
  					{
  						break;
  					}
  
  					/*
! 					 * Nope, not found in archive.
! 					 */
! 
! 					/*
! 					 * Check to see if the trigger file exists. Note that
! 					 * we do this only after failure, so when you create
! 					 * the trigger file, we still finish replaying as much
! 					 * as we can from archive and pg_xlog before failover.
  					 */
! 					if (CheckForStandbyTrigger())
! 						goto triggered;
! 
! 					/*
! 					 * Sleep if it hasn't been long since last attempt.
! 					 */
! 					now = (pg_time_t) time(NULL);
! 					if ((now - last_fail_time) < 5)
  					{
! 						pg_usleep(1000000L * (5 - (now - last_fail_time)));
! 						now = (pg_time_t) time(NULL);
  					}
! 					last_fail_time = now;
  
  					/*
! 					 * If primary_conninfo is set, launch walreceiver to
! 					 * try to stream the missing WAL.
  					 *
  					 * If fetching_ckpt is TRUE, RecPtr points to the initial
  					 * checkpoint location. In that case, we use RedoStartLSN
***************
*** 8847,8859 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
  			/* In archive or crash recovery. */
  			if (readFile < 0)
  			{
  				/* Reset curFileTLI if random fetch. */
  				if (randAccess)
  					curFileTLI = 0;
! 				readFile = XLogFileReadAnyTLI(readId, readSeg, emode,
! 											  InArchiveRecovery);
  				switched_segment = true;
- 				readStreamed = false;
  				if (readFile < 0)
  					return false;
  			}
--- 8896,8914 ----
  			/* In archive or crash recovery. */
  			if (readFile < 0)
  			{
+ 				int sources;
  				/* Reset curFileTLI if random fetch. */
  				if (randAccess)
  					curFileTLI = 0;
! 
! 				sources = XLOG_FROM_PG_XLOG;
! 				if (InArchiveRecovery)
! 					sources |= XLOG_FROM_ARCHIVE;
! 				sources &= ~failedSources;
! 
! 				readFile = XLogFileReadAnyTLI(readId, readSeg, emode_arg,
! 											  sources);
  				switched_segment = true;
  				if (readFile < 0)
  					return false;
  			}
***************
*** 8861,8878 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
  	}
  
  	/*
! 	 * At this point, we have the right segment open and we know the requested
! 	 * record is in it.
  	 */
  	Assert(readFile != -1);
  
  	/*
  	 * If the current segment is being streamed from master, calculate how
  	 * much of the current page we have received already. We know the
  	 * requested record has been received, but this is for the benefit of
  	 * future calls, to allow quick exit at the top of this function.
  	 */
! 	if (readStreamed)
  	{
  		if (RecPtr->xlogid != receivedUpto.xlogid ||
  			(RecPtr->xrecoff / XLOG_BLCKSZ) != (receivedUpto.xrecoff / XLOG_BLCKSZ))
--- 8916,8944 ----
  	}
  
  	/*
! 	 * At this point, we have the right segment open and if we're streaming
! 	 * we know the requested record is in it.
  	 */
  	Assert(readFile != -1);
  
  	/*
+ 	 * We don't expect any invalid records in archive or in records streamed
+ 	 * from master: we should never hit the end of WAL because we wait for it
+ 	 * to be streamed. Therefore treat any broken WAL as PANIC, instead of
+ 	 * failing over.
+ 	 */
+ 	if (readSource == XLOG_FROM_STREAM || readSource == XLOG_FROM_ARCHIVE)
+ 		emode = PANIC;
+ 	else
+ 		emode = emode_arg;
+ 
+ 	/*
  	 * If the current segment is being streamed from master, calculate how
  	 * much of the current page we have received already. We know the
  	 * requested record has been received, but this is for the benefit of
  	 * future calls, to allow quick exit at the top of this function.
  	 */
! 	if (readSource == XLOG_FROM_STREAM)
  	{
  		if (RecPtr->xlogid != receivedUpto.xlogid ||
  			(RecPtr->xrecoff / XLOG_BLCKSZ) != (receivedUpto.xrecoff / XLOG_BLCKSZ))
***************
*** 8936,8946 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
  	return true;
  
  next_record_is_invalid:
  	if (readFile >= 0)
  		close(readFile);
  	readFile = -1;
- 	readStreamed = false;
  	readLen = 0;
  
  	return false;
  }
--- 9002,9026 ----
  	return true;
  
  next_record_is_invalid:
+ 	failedSources |= readSource;
+ 
+ 	if (readFile >= 0)
+ 		close(readFile);
+ 	readFile = -1;
+ 	readLen = 0;
+ 	readSource = 0;
+ 
+ 	if (StandbyMode)
+ 		goto retry;
+ 	else
+ 		return false;
+ 
+ triggered:
  	if (readFile >= 0)
  		close(readFile);
  	readFile = -1;
  	readLen = 0;
+ 	readSource = 0;
  
  	return false;
  }
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to