On Mon, 2008-10-20 at 13:54 +0100, Simon Riggs wrote:

> I'm looking at how to make queries safe during recovery, in the presence
> of concurrent changes to blocks. In particular, concurrent removal of
> rows that might be read by queries.
> 
> My thinking is 
> * we ignore LockRelationForExtension(). Normal queries never request it.
> All new blocks were created with that lock held and we are replaying
> changes serially, so we do not need to re-create that lock. We already
> do this, so no change.
> * re-create the Cleanup lock on blocks, when the original operation was
> performed while a Cleanup lock was held.
> 
> So the plan is to introduce a new XLogLockBufferForCleanup() function
> and then use it in all places where a cleanup lock was held during the
> write of the WAL record.
> 
> This means we will need to hold cleanup lock:
> 
> * while RM_HEAP2_ID records are applied (Freeze, Clean, CleanMove)
> 
> * while an XLOG_BTREE_DELETE was generated by VACUUM - this record type
> is not always generated by VACUUM. So split this WAL record into two
> types XLOG_BTREE_DELETE and XLOG_BTREE_VACUUM, so we can hold Cleanup
> lock while applyinh XLOG_BTREE_VACUUM. (This may not be required, but
> I'd rather do the full locking now and relax it later).
> 
> * Whenever we apply a backup block that performs the same function as
> any of the above WAL records. So we would hold Cleanup lock when
> applying WAL records of types
>   all RM_HEAP2_ID types
>   XLOG_BTREE_VACUUM
> 
> I'm most of the way through implementing the above and will post patch
> as a separate item to make it easier to inspect.

Here's the code to implement this portion.

I'm undecided as to whether this is too much code or too little. I'm
somewhat uncertain as to the locking requirements for the physical scan
during a vacuum. I've worked out various options if we need to change
this.

Also, the way I take a cleanup lock during RestoreBkpBlocks() seems very
ugly, but too much additional code seems over the top also.

I'm integrating into rest of patch now.

-- 
 Simon Riggs           www.2ndQuadrant.com
 PostgreSQL Training, Services and Support
Index: src/backend/access/heap/heapam.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/access/heap/heapam.c,v
retrieving revision 1.265
diff -c -r1.265 heapam.c
*** src/backend/access/heap/heapam.c	8 Oct 2008 01:14:44 -0000	1.265
--- src/backend/access/heap/heapam.c	22 Oct 2008 22:00:36 -0000
***************
*** 4033,4039 ****
  	if (record->xl_info & XLR_BKP_BLOCK_1)
  		return;
  
! 	buffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
  	if (!BufferIsValid(buffer))
  		return;
  	page = (Page) BufferGetPage(buffer);
--- 4033,4039 ----
  	if (record->xl_info & XLR_BKP_BLOCK_1)
  		return;
  
! 	buffer = XLogReadBufferForCleanup(xlrec->node, xlrec->block, false);
  	if (!BufferIsValid(buffer))
  		return;
  	page = (Page) BufferGetPage(buffer);
***************
*** 4082,4088 ****
  	if (record->xl_info & XLR_BKP_BLOCK_1)
  		return;
  
! 	buffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
  	if (!BufferIsValid(buffer))
  		return;
  	page = (Page) BufferGetPage(buffer);
--- 4082,4088 ----
  	if (record->xl_info & XLR_BKP_BLOCK_1)
  		return;
  
! 	buffer = XLogReadBufferForCleanup(xlrec->node, xlrec->block, false);
  	if (!BufferIsValid(buffer))
  		return;
  	page = (Page) BufferGetPage(buffer);
Index: src/backend/access/nbtree/nbtinsert.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/access/nbtree/nbtinsert.c,v
retrieving revision 1.167
diff -c -r1.167 nbtinsert.c
*** src/backend/access/nbtree/nbtinsert.c	11 Jun 2008 08:38:56 -0000	1.167
--- src/backend/access/nbtree/nbtinsert.c	22 Oct 2008 22:00:36 -0000
***************
*** 1924,1930 ****
  	}
  
  	if (ndeletable > 0)
! 		_bt_delitems(rel, buffer, deletable, ndeletable);
  
  	/*
  	 * Note: if we didn't find any LP_DEAD items, then the page's
--- 1924,1930 ----
  	}
  
  	if (ndeletable > 0)
! 		_bt_delitems(rel, buffer, deletable, ndeletable, false);
  
  	/*
  	 * Note: if we didn't find any LP_DEAD items, then the page's
Index: src/backend/access/nbtree/nbtpage.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/access/nbtree/nbtpage.c,v
retrieving revision 1.111
diff -c -r1.111 nbtpage.c
*** src/backend/access/nbtree/nbtpage.c	30 Sep 2008 10:52:10 -0000	1.111
--- src/backend/access/nbtree/nbtpage.c	22 Oct 2008 22:00:36 -0000
***************
*** 649,658 ****
   *
   * This routine assumes that the caller has pinned and locked the buffer.
   * Also, the given itemnos *must* appear in increasing order in the array.
   */
  void
  _bt_delitems(Relation rel, Buffer buf,
! 			 OffsetNumber *itemnos, int nitems)
  {
  	Page		page = BufferGetPage(buf);
  	BTPageOpaque opaque;
--- 649,660 ----
   *
   * This routine assumes that the caller has pinned and locked the buffer.
   * Also, the given itemnos *must* appear in increasing order in the array.
+  * If (needCleanupLock) we assume the lock is a super-exclusive lock,
+  * not just a normal block ExclusiveLock.
   */
  void
  _bt_delitems(Relation rel, Buffer buf,
! 			 OffsetNumber *itemnos, int nitems, bool needCleanupLock)
  {
  	Page		page = BufferGetPage(buf);
  	BTPageOpaque opaque;
***************
*** 715,721 ****
  		rdata[1].buffer_std = true;
  		rdata[1].next = NULL;
  
! 		recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_DELETE, rdata);
  
  		PageSetLSN(page, recptr);
  		PageSetTLI(page, ThisTimeLineID);
--- 717,726 ----
  		rdata[1].buffer_std = true;
  		rdata[1].next = NULL;
  
! 		if (needCleanupLock)
! 			recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_VACUUM, rdata);
! 		else
! 			recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_DELETE, rdata);
  
  		PageSetLSN(page, recptr);
  		PageSetTLI(page, ThisTimeLineID);
Index: src/backend/access/nbtree/nbtree.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/access/nbtree/nbtree.c,v
retrieving revision 1.163
diff -c -r1.163 nbtree.c
*** src/backend/access/nbtree/nbtree.c	6 Oct 2008 08:04:11 -0000	1.163
--- src/backend/access/nbtree/nbtree.c	22 Oct 2008 22:00:36 -0000
***************
*** 857,863 ****
  		 */
  		if (ndeletable > 0)
  		{
! 			_bt_delitems(rel, buf, deletable, ndeletable);
  			stats->tuples_removed += ndeletable;
  			/* must recompute maxoff */
  			maxoff = PageGetMaxOffsetNumber(page);
--- 857,863 ----
  		 */
  		if (ndeletable > 0)
  		{
! 			_bt_delitems(rel, buf, deletable, ndeletable, true);
  			stats->tuples_removed += ndeletable;
  			/* must recompute maxoff */
  			maxoff = PageGetMaxOffsetNumber(page);
Index: src/backend/access/nbtree/nbtxlog.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/access/nbtree/nbtxlog.c,v
retrieving revision 1.52
diff -c -r1.52 nbtxlog.c
*** src/backend/access/nbtree/nbtxlog.c	12 Jun 2008 09:12:30 -0000	1.52
--- src/backend/access/nbtree/nbtxlog.c	22 Oct 2008 22:00:36 -0000
***************
*** 459,465 ****
  }
  
  static void
! btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
  {
  	xl_btree_delete *xlrec;
  	Buffer		buffer;
--- 459,465 ----
  }
  
  static void
! btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record, bool needCleanupLock)
  {
  	xl_btree_delete *xlrec;
  	Buffer		buffer;
***************
*** 470,476 ****
  		return;
  
  	xlrec = (xl_btree_delete *) XLogRecGetData(record);
! 	buffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
  	if (!BufferIsValid(buffer))
  		return;
  	page = (Page) BufferGetPage(buffer);
--- 470,479 ----
  		return;
  
  	xlrec = (xl_btree_delete *) XLogRecGetData(record);
! 	if (needCleanupLock)
! 		buffer = XLogReadBufferForCleanup(xlrec->node, xlrec->block, false);
! 	else
! 		buffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
  	if (!BufferIsValid(buffer))
  		return;
  	page = (Page) BufferGetPage(buffer);
***************
*** 737,744 ****
  		case XLOG_BTREE_SPLIT_R_ROOT:
  			btree_xlog_split(false, true, lsn, record);
  			break;
  		case XLOG_BTREE_DELETE:
! 			btree_xlog_delete(lsn, record);
  			break;
  		case XLOG_BTREE_DELETE_PAGE:
  		case XLOG_BTREE_DELETE_PAGE_META:
--- 740,750 ----
  		case XLOG_BTREE_SPLIT_R_ROOT:
  			btree_xlog_split(false, true, lsn, record);
  			break;
+ 		case XLOG_BTREE_VACUUM:
+ 			btree_xlog_delete(lsn, record, true);
+ 			break;
  		case XLOG_BTREE_DELETE:
! 			btree_xlog_delete(lsn, record, false);
  			break;
  		case XLOG_BTREE_DELETE_PAGE:
  		case XLOG_BTREE_DELETE_PAGE_META:
***************
*** 753,758 ****
--- 759,797 ----
  	}
  }
  
+ bool
+ btree_needs_cleanup_lock(uint8 info)
+ {
+ 	switch (info)
+ 	{
+ 		case XLOG_BTREE_VACUUM:
+ 			return true;
+ 			break;
+ 
+ 		case XLOG_BTREE_INSERT_LEAF:
+ 		case XLOG_BTREE_INSERT_UPPER:
+ 		case XLOG_BTREE_INSERT_META:
+ 		case XLOG_BTREE_SPLIT_L:
+ 		case XLOG_BTREE_SPLIT_R:
+ 		case XLOG_BTREE_SPLIT_L_ROOT:
+ 		case XLOG_BTREE_SPLIT_R_ROOT:
+ 		case XLOG_BTREE_DELETE:
+ 		case XLOG_BTREE_DELETE_PAGE:
+ 		case XLOG_BTREE_DELETE_PAGE_META:
+ 		case XLOG_BTREE_DELETE_PAGE_HALF:
+ 		case XLOG_BTREE_NEWROOT:
+ 			return false;
+ 			break;
+ 
+ 		default:
+ 			elog(PANIC, "btree_needs_cleanup_lock: unknown op code %u", info);
+ 	}
+ 
+ 	/* never reached */
+ 	return false;
+ 
+ }
+ 
  static void
  out_target(StringInfo buf, xl_btreetid *target)
  {
Index: src/backend/access/transam/xlog.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/access/transam/xlog.c,v
retrieving revision 1.319
diff -c -r1.319 xlog.c
*** src/backend/access/transam/xlog.c	23 Sep 2008 09:20:35 -0000	1.319
--- src/backend/access/transam/xlog.c	22 Oct 2008 22:03:33 -0000
***************
*** 2887,2892 ****
--- 2887,2905 ----
  	BkpBlock	bkpb;
  	char	   *blk;
  	int			i;
+ 	int			mode;
+ 	RmgrId		rmid = record->xl_rmid;
+ 	uint8		info = record->xl_info & ~XLR_INFO_MASK;
+ 
+ 	/*
+ 	 * What kind of lock do we need to apply the backup blocks?
+ 	 * Ugly hack around needing an additioning RMgr API call.
+ 	 */
+ 	if (rmid == RM_HEAP2_ID || 
+ 		(rmid == RM_BTREE_ID && btree_needs_cleanup_lock(info)))
+ 		mode = BUFFER_LOCK_CLEANUP;
+ 	else
+ 		mode = BUFFER_LOCK_EXCLUSIVE;
  
  	blk = (char *) XLogRecGetData(record) + record->xl_len;
  	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
***************
*** 2898,2904 ****
  		blk += sizeof(BkpBlock);
  
  		buffer = XLogReadBufferWithFork(bkpb.node, bkpb.fork, bkpb.block,
! 										true);
  		Assert(BufferIsValid(buffer));
  		page = (Page) BufferGetPage(buffer);
  
--- 2911,2917 ----
  		blk += sizeof(BkpBlock);
  
  		buffer = XLogReadBufferWithFork(bkpb.node, bkpb.fork, bkpb.block,
! 										true, mode);
  		Assert(BufferIsValid(buffer));
  		page = (Page) BufferGetPage(buffer);
  
Index: src/backend/access/transam/xlogutils.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/access/transam/xlogutils.c,v
retrieving revision 1.59
diff -c -r1.59 xlogutils.c
*** src/backend/access/transam/xlogutils.c	30 Sep 2008 10:52:11 -0000	1.59
--- src/backend/access/transam/xlogutils.c	22 Oct 2008 22:00:36 -0000
***************
*** 228,234 ****
  Buffer
  XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init)
  {
! 	return XLogReadBufferWithFork(rnode, MAIN_FORKNUM, blkno, init);
  }
  
  /*
--- 228,240 ----
  Buffer
  XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init)
  {
! 	return XLogReadBufferWithFork(rnode, MAIN_FORKNUM, blkno, init, BUFFER_LOCK_EXCLUSIVE);
! }
! 
! Buffer
! XLogReadBufferForCleanup(RelFileNode rnode, BlockNumber blkno, bool init)
! {
! 	return XLogReadBufferWithFork(rnode, MAIN_FORKNUM, blkno, init, BUFFER_LOCK_CLEANUP);
  }
  
  /*
***************
*** 238,244 ****
   */
  Buffer
  XLogReadBufferWithFork(RelFileNode rnode, ForkNumber forknum,
! 					   BlockNumber blkno, bool init)
  {
  	BlockNumber lastblock;
  	Buffer		buffer;
--- 244,250 ----
   */
  Buffer
  XLogReadBufferWithFork(RelFileNode rnode, ForkNumber forknum,
! 					   BlockNumber blkno, bool init, int mode)
  {
  	BlockNumber lastblock;
  	Buffer		buffer;
***************
*** 289,295 ****
  		Assert(BufferGetBlockNumber(buffer) == blkno);
  	}
  
! 	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
  
  	if (!init)
  	{
--- 295,306 ----
  		Assert(BufferGetBlockNumber(buffer) == blkno);
  	}
  
! 	if (mode == BUFFER_LOCK_EXCLUSIVE)
! 		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
! 	else if (mode == BUFFER_LOCK_CLEANUP)
! 		LockBufferForCleanup(buffer);
! 	else
! 		elog(FATAL, "Invalid buffer lock mode %d", mode);
  
  	if (!init)
  	{
Index: src/backend/optimizer/util/var.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/optimizer/util/var.c,v
retrieving revision 1.81
diff -c -r1.81 var.c
*** src/backend/optimizer/util/var.c	21 Oct 2008 20:42:53 -0000	1.81
--- src/backend/optimizer/util/var.c	22 Oct 2008 19:47:21 -0000
***************
*** 698,703 ****
--- 698,706 ----
  {
  	if (node == NULL)
  		return NULL;
+ 
+ 	Assert(!IsA(node, AppendRelInfo));
+ 
  	if (IsA(node, Var))
  	{
  		Var		   *var = (Var *) node;
Index: src/backend/storage/buffer/bufmgr.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/storage/buffer/bufmgr.c,v
retrieving revision 1.239
diff -c -r1.239 bufmgr.c
*** src/backend/storage/buffer/bufmgr.c	20 Oct 2008 21:11:15 -0000	1.239
--- src/backend/storage/buffer/bufmgr.c	22 Oct 2008 22:00:36 -0000
***************
*** 2364,2369 ****
--- 2364,2370 ----
  
  	Assert(BufferIsValid(buffer));
  	Assert(PinCountWaitBuf == NULL);
+ 	Assert(!IsRecoveryProcessingMode() || InRecovery);
  
  	if (BufferIsLocal(buffer))
  	{
Index: src/backend/storage/freespace/freespace.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/storage/freespace/freespace.c,v
retrieving revision 1.64
diff -c -r1.64 freespace.c
*** src/backend/storage/freespace/freespace.c	1 Oct 2008 14:59:23 -0000	1.64
--- src/backend/storage/freespace/freespace.c	22 Oct 2008 22:00:36 -0000
***************
*** 779,785 ****
  	 * replay of the smgr truncation record to remove completely unused
  	 * pages.
  	 */
! 	buf = XLogReadBufferWithFork(xlrec->node, FSM_FORKNUM, fsmblk, false);
  	if (BufferIsValid(buf))
  	{
  		fsm_truncate_avail(BufferGetPage(buf), first_removed_slot);
--- 779,786 ----
  	 * replay of the smgr truncation record to remove completely unused
  	 * pages.
  	 */
! 	buf = XLogReadBufferWithFork(xlrec->node, FSM_FORKNUM, fsmblk, 
! 											false, BUFFER_LOCK_CLEANUP);
  	if (BufferIsValid(buf))
  	{
  		fsm_truncate_avail(BufferGetPage(buf), first_removed_slot);
Index: src/include/access/nbtree.h
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/include/access/nbtree.h,v
retrieving revision 1.121
diff -c -r1.121 nbtree.h
*** src/include/access/nbtree.h	13 Jul 2008 20:45:47 -0000	1.121
--- src/include/access/nbtree.h	22 Oct 2008 22:05:35 -0000
***************
*** 214,225 ****
  #define XLOG_BTREE_SPLIT_R		0x40	/* as above, new item on right */
  #define XLOG_BTREE_SPLIT_L_ROOT 0x50	/* add tuple with split of root */
  #define XLOG_BTREE_SPLIT_R_ROOT 0x60	/* as above, new item on right */
! #define XLOG_BTREE_DELETE		0x70	/* delete leaf index tuple */
  #define XLOG_BTREE_DELETE_PAGE	0x80	/* delete an entire page */
  #define XLOG_BTREE_DELETE_PAGE_META 0x90		/* same, and update metapage */
  #define XLOG_BTREE_NEWROOT		0xA0	/* new root page */
  #define XLOG_BTREE_DELETE_PAGE_HALF 0xB0		/* page deletion that makes
  												 * parent half-dead */
  
  /*
   * All that we need to find changed index tuple
--- 214,226 ----
  #define XLOG_BTREE_SPLIT_R		0x40	/* as above, new item on right */
  #define XLOG_BTREE_SPLIT_L_ROOT 0x50	/* add tuple with split of root */
  #define XLOG_BTREE_SPLIT_R_ROOT 0x60	/* as above, new item on right */
! #define XLOG_BTREE_DELETE		0x70	/* delete leaf index tuples for a page */
  #define XLOG_BTREE_DELETE_PAGE	0x80	/* delete an entire page */
  #define XLOG_BTREE_DELETE_PAGE_META 0x90		/* same, and update metapage */
  #define XLOG_BTREE_NEWROOT		0xA0	/* new root page */
  #define XLOG_BTREE_DELETE_PAGE_HALF 0xB0		/* page deletion that makes
  												 * parent half-dead */
+ #define XLOG_BTREE_VACUUM		0xC0	/* delete entries on a page during vacuum */
  
  /*
   * All that we need to find changed index tuple
***************
*** 537,543 ****
  extern void _bt_pageinit(Page page, Size size);
  extern bool _bt_page_recyclable(Page page);
  extern void _bt_delitems(Relation rel, Buffer buf,
! 			 OffsetNumber *itemnos, int nitems);
  extern int _bt_pagedel(Relation rel, Buffer buf,
  			BTStack stack, bool vacuum_full);
  
--- 538,544 ----
  extern void _bt_pageinit(Page page, Size size);
  extern bool _bt_page_recyclable(Page page);
  extern void _bt_delitems(Relation rel, Buffer buf,
! 			 OffsetNumber *itemnos, int nitems, bool needCleanupLock);
  extern int _bt_pagedel(Relation rel, Buffer buf,
  			BTStack stack, bool vacuum_full);
  
Index: src/include/access/xlogutils.h
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/include/access/xlogutils.h,v
retrieving revision 1.26
diff -c -r1.26 xlogutils.h
*** src/include/access/xlogutils.h	11 Aug 2008 11:05:11 -0000	1.26
--- src/include/access/xlogutils.h	22 Oct 2008 22:06:41 -0000
***************
*** 25,34 ****
  								 BlockNumber nblocks);
  
  extern Buffer XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init);
  extern Buffer XLogReadBufferWithFork(RelFileNode rnode, ForkNumber forknum,
! 									 BlockNumber blkno, bool init);
  
  extern Relation CreateFakeRelcacheEntry(RelFileNode rnode);
  extern void FreeFakeRelcacheEntry(Relation fakerel);
  
  #endif
--- 25,39 ----
  								 BlockNumber nblocks);
  
  extern Buffer XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init);
+ extern Buffer XLogReadBufferForCleanup(RelFileNode rnode, BlockNumber blkno, bool init);
  extern Buffer XLogReadBufferWithFork(RelFileNode rnode, ForkNumber forknum,
! 									 BlockNumber blkno, bool init, int mode);
  
  extern Relation CreateFakeRelcacheEntry(RelFileNode rnode);
  extern void FreeFakeRelcacheEntry(Relation fakerel);
  
+ /* include this here to avoid including nbtree.h */
+ extern bool btree_needs_cleanup_lock(uint8 info);
+ 
+ 
  #endif
Index: src/include/storage/bufmgr.h
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/include/storage/bufmgr.h,v
retrieving revision 1.115
diff -c -r1.115 bufmgr.h
*** src/include/storage/bufmgr.h	11 Aug 2008 11:05:11 -0000	1.115
--- src/include/storage/bufmgr.h	22 Oct 2008 22:00:36 -0000
***************
*** 58,63 ****
--- 58,66 ----
  #define BUFFER_LOCK_SHARE		1
  #define BUFFER_LOCK_EXCLUSIVE	2
  
+ /* Not used by LockBuffer, but is used by XLogReadBuffer... */
+ #define BUFFER_LOCK_CLEANUP	3
+ 
  /*
   * These routines are beaten on quite heavily, hence the macroization.
   */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to