The hot standby patch has some hacks to decide which full-page-images can be restored holding an exclusive lock and which ones need a vacuum-strength lock. It's not very pretty as is, as mentioned in comments too.

How about we refactor things so that redo-functions are responsible for calling RestoreBkpBlocks? The redo function can then pass an argument indicating what kind of lock is required. We should also change XLogReadBufferExtended so that it doesn't lock the page; the caller knows better what kind of a lock it needs. That makes it more analogous with ReadBufferExtended too, although I think we should keep XLogReadBuffer() unchanged for now.

See attached patch. One shortfall of this patch is that you can pass only one argument to RestoreBkpBlocks, but there can multiple backup blocks in one WAL record. That's OK in current usage, though.

--
  Heikki Linnakangas
  EnterpriseDB   http://www.enterprisedb.com
*** src/backend/access/gin/ginxlog.c
--- src/backend/access/gin/ginxlog.c
***************
*** 438,443 **** gin_redo(XLogRecPtr lsn, XLogRecord *record)
--- 438,445 ----
  {
  	uint8		info = record->xl_info & ~XLR_INFO_MASK;
  
+ 	RestoreBkpBlocks(lsn, record, false);
+ 
  	topCtx = MemoryContextSwitchTo(opCtx);
  	switch (info)
  	{
*** src/backend/access/gist/gistxlog.c
--- src/backend/access/gist/gistxlog.c
***************
*** 395,400 **** gist_redo(XLogRecPtr lsn, XLogRecord *record)
--- 395,402 ----
  {
  	uint8		info = record->xl_info & ~XLR_INFO_MASK;
  
+ 	RestoreBkpBlocks(lsn, record, false);
+ 
  	MemoryContext oldCxt;
  
  	oldCxt = MemoryContextSwitchTo(opCtx);
*** src/backend/access/heap/heapam.c
--- src/backend/access/heap/heapam.c
***************
*** 4777,4782 **** heap_redo(XLogRecPtr lsn, XLogRecord *record)
--- 4777,4784 ----
  {
  	uint8		info = record->xl_info & ~XLR_INFO_MASK;
  
+ 	RestoreBkpBlocks(lsn, record, false);
+ 
  	switch (info & XLOG_HEAP_OPMASK)
  	{
  		case XLOG_HEAP_INSERT:
***************
*** 4816,4827 **** heap2_redo(XLogRecPtr lsn, XLogRecord *record)
--- 4818,4832 ----
  	switch (info & XLOG_HEAP_OPMASK)
  	{
  		case XLOG_HEAP2_FREEZE:
+ 			RestoreBkpBlocks(lsn, record, false);
  			heap_xlog_freeze(lsn, record);
  			break;
  		case XLOG_HEAP2_CLEAN:
+ 			RestoreBkpBlocks(lsn, record, true);
  			heap_xlog_clean(lsn, record, false);
  			break;
  		case XLOG_HEAP2_CLEAN_MOVE:
+ 			RestoreBkpBlocks(lsn, record, true);
  			heap_xlog_clean(lsn, record, true);
  			break;
  		default:
*** src/backend/access/nbtree/nbtxlog.c
--- src/backend/access/nbtree/nbtxlog.c
***************
*** 714,719 **** btree_redo(XLogRecPtr lsn, XLogRecord *record)
--- 714,721 ----
  {
  	uint8		info = record->xl_info & ~XLR_INFO_MASK;
  
+ 	RestoreBkpBlocks(lsn, record, false);
+ 
  	switch (info)
  	{
  		case XLOG_BTREE_INSERT_LEAF:
*** src/backend/access/transam/xlog.c
--- src/backend/access/transam/xlog.c
***************
*** 2934,2941 **** CleanupBackupHistory(void)
   * modifications of the page that appear in XLOG, rather than possibly
   * ignoring them as already applied, but that's not a huge drawback.
   */
! static void
! RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
  {
  	Buffer		buffer;
  	Page		page;
--- 2934,2941 ----
   * modifications of the page that appear in XLOG, rather than possibly
   * ignoring them as already applied, but that's not a huge drawback.
   */
! void
! RestoreBkpBlocks(XLogRecPtr lsn, XLogRecord *record, bool cleanup)
  {
  	Buffer		buffer;
  	Page		page;
***************
*** 2943,2948 **** RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
--- 2943,2951 ----
  	char	   *blk;
  	int			i;
  
+ 	if (!(record->xl_info & XLR_BKP_BLOCK_MASK))
+ 		return;
+ 
  	blk = (char *) XLogRecGetData(record) + record->xl_len;
  	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
  	{
***************
*** 2955,2960 **** RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
--- 2958,2968 ----
  		buffer = XLogReadBufferExtended(bkpb.node, bkpb.fork, bkpb.block,
  										RBM_ZERO);
  		Assert(BufferIsValid(buffer));
+ 		if (cleanup)
+ 			LockBufferForCleanup(buffer);
+ 		else
+ 			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+ 
  		page = (Page) BufferGetPage(buffer);
  
  		if (bkpb.hole_length == 0)
***************
*** 5210,5218 **** StartupXLOG(void)
  					TransactionIdAdvance(ShmemVariableCache->nextXid);
  				}
  
- 				if (record->xl_info & XLR_BKP_BLOCK_MASK)
- 					RestoreBkpBlocks(record, EndRecPtr);
- 
  				RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
  
  				/* Pop the error context stack */
--- 5218,5223 ----
*** src/backend/access/transam/xlogutils.c
--- src/backend/access/transam/xlogutils.c
***************
*** 217,224 **** XLogCheckInvalidPages(void)
  
  /*
   * XLogReadBuffer
!  *		A shorthand of XLogReadBufferExtended(), for reading from the main
!  *		fork.
   *
   * For historical reasons, instead of a ReadBufferMode argument, this only
   * supports RBM_ZERO (init == true) and RBM_NORMAL (init == false) modes.
--- 217,234 ----
  
  /*
   * XLogReadBuffer
!  *		Read a page during XLOG replay.
!  *
!  * This is a shorthand of XLogReadBufferExtended() followed by
!  * LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE), for reading from the main
!  * fork.
!  *
!  * (Getting the lock is not really necessary, since we
!  * expect that this is only used during single-process XLOG replay, but
!  * some subroutines such as MarkBufferDirty will complain if we don't.)
!  * XXX: but it will be with the hot standby patch.
!  *
!  * The returned buffer is exclusively-locked.
   *
   * For historical reasons, instead of a ReadBufferMode argument, this only
   * supports RBM_ZERO (init == true) and RBM_NORMAL (init == false) modes.
***************
*** 226,247 **** XLogCheckInvalidPages(void)
  Buffer
  XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init)
  {
! 	return XLogReadBufferExtended(rnode, MAIN_FORKNUM, blkno,
! 								  init ? RBM_ZERO : RBM_NORMAL);
  }
  
  /*
   * XLogReadBufferExtended
   *		Read a page during XLOG replay
   *
!  * This is functionally comparable to ReadBuffer followed by
!  * LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE): you get back a pinned
!  * and locked buffer.  (Getting the lock is not really necessary, since we
!  * expect that this is only used during single-process XLOG replay, but
!  * some subroutines such as MarkBufferDirty will complain if we don't.)
!  *
!  * There's some differences in the behavior wrt. the "mode" argument,
!  * compared to ReadBufferExtended:
   *
   * In RBM_NORMAL mode, if the page doesn't exist, or contains all-zeroes, we
   * return InvalidBuffer. In this case the caller should silently skip the
--- 236,256 ----
  Buffer
  XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init)
  {
! 	Buffer buf;
! 	buf = XLogReadBufferExtended(rnode, MAIN_FORKNUM, blkno,
! 								 init ? RBM_ZERO : RBM_NORMAL);
! 	if (BufferIsValid(buf))
! 		LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
! 
! 	return buf;
  }
  
  /*
   * XLogReadBufferExtended
   *		Read a page during XLOG replay
   *
!  * This is functionally comparable to ReadBufferExtended. There's some
!  * differences in the behavior wrt. the "mode" argument:
   *
   * In RBM_NORMAL mode, if the page doesn't exist, or contains all-zeroes, we
   * return InvalidBuffer. In this case the caller should silently skip the
***************
*** 306,321 **** XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
  		Assert(BufferGetBlockNumber(buffer) == blkno);
  	}
  
- 	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
- 
  	if (mode == RBM_NORMAL)
  	{
  		/* check that page has been initialized */
  		Page		page = (Page) BufferGetPage(buffer);
  
  		if (PageIsNew(page))
  		{
- 			UnlockReleaseBuffer(buffer);
  			log_invalid_page(rnode, forknum, blkno, true);
  			return InvalidBuffer;
  		}
--- 315,331 ----
  		Assert(BufferGetBlockNumber(buffer) == blkno);
  	}
  
  	if (mode == RBM_NORMAL)
  	{
  		/* check that page has been initialized */
  		Page		page = (Page) BufferGetPage(buffer);
  
+ 		/*
+ 		 * We assume that PageIsNew works without a lock. During recovery,
+ 		 * no other backend should modify the buffer at the same time.
+ 		 */
  		if (PageIsNew(page))
  		{
  			log_invalid_page(rnode, forknum, blkno, true);
  			return InvalidBuffer;
  		}
*** src/backend/commands/sequence.c
--- src/backend/commands/sequence.c
***************
*** 1339,1344 **** seq_redo(XLogRecPtr lsn, XLogRecord *record)
--- 1339,1346 ----
  	xl_seq_rec *xlrec = (xl_seq_rec *) XLogRecGetData(record);
  	sequence_magic *sm;
  
+ 	RestoreBkpBlocks(lsn, record, false);
+ 
  	if (info != XLOG_SEQ_LOG)
  		elog(PANIC, "seq_redo: unknown op code %u", info);
  
*** src/backend/storage/freespace/freespace.c
--- src/backend/storage/freespace/freespace.c
***************
*** 212,217 **** XLogRecordPageWithFreeSpace(RelFileNode rnode, BlockNumber heapBlk,
--- 212,219 ----
  
  	/* If the page doesn't exist already, extend */
  	buf = XLogReadBufferExtended(rnode, FSM_FORKNUM, blkno, RBM_ZERO_ON_ERROR);
+ 	LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+ 
  	page = BufferGetPage(buf);
  	if (PageIsNew(page))
  		PageInit(page, BLCKSZ, 0);
*** src/include/access/xlog.h
--- src/include/access/xlog.h
***************
*** 197,202 **** extern void XLogSetAsyncCommitLSN(XLogRecPtr record);
--- 197,204 ----
  extern void xlog_redo(XLogRecPtr lsn, XLogRecord *record);
  extern void xlog_desc(StringInfo buf, uint8 xl_info, char *rec);
  
+ extern void RestoreBkpBlocks(XLogRecPtr lsn, XLogRecord *record, bool cleanup);
+ 
  extern void UpdateControlFile(void);
  extern Size XLOGShmemSize(void);
  extern void XLOGShmemInit(void);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to