I wrote:
> it did occur to me that there is a simple way to ameliorate this
> problem: we could rearrange the code in _bt_pagedel() so it checks for
> this case before entering its critical section.  Then, corruption of
> this kind is at least only an ERROR not a PANIC.

In particular, I propose the attached patch, which gets rid of
unnecessary PANIC cases in _bt_split as well as _bt_pagedel;
all of these get reported from the field every now and then.
In passing this also makes the error messages out of _bt_split
a bit more detailed.

FWIW, I think the original rationale for PANIC here was so we could
capture a core dump for study; but since no one has ever yet cooperated
by providing such a dump, it seems like not panicking is a better plan.

Barring objections, I plan to back-patch this as far as it will
conveniently go (looks like 8.2 or 8.3 or so).

                        regards, tom lane

Index: src/backend/access/nbtree/nbtinsert.c
===================================================================
RCS file: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v
retrieving revision 1.178
diff -c -r1.178 nbtinsert.c
*** src/backend/access/nbtree/nbtinsert.c	28 Mar 2010 09:27:01 -0000	1.178
--- src/backend/access/nbtree/nbtinsert.c	28 Aug 2010 23:31:44 -0000
***************
*** 74,82 ****
  static void _bt_checksplitloc(FindSplitData *state,
  				  OffsetNumber firstoldonright, bool newitemonleft,
  				  int dataitemstoleft, Size firstoldonrightsz);
! static void _bt_pgaddtup(Relation rel, Page page,
! 			 Size itemsize, IndexTuple itup,
! 			 OffsetNumber itup_off, const char *where);
  static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
  			int keysz, ScanKey scankey);
  static void _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel);
--- 74,81 ----
  static void _bt_checksplitloc(FindSplitData *state,
  				  OffsetNumber firstoldonright, bool newitemonleft,
  				  int dataitemstoleft, Size firstoldonrightsz);
! static bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup,
! 			 OffsetNumber itup_off);
  static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
  			int keysz, ScanKey scankey);
  static void _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel);
***************
*** 753,759 ****
  		/* Do the update.  No ereport(ERROR) until changes are logged */
  		START_CRIT_SECTION();
  
! 		_bt_pgaddtup(rel, page, itemsz, itup, newitemoff, "page");
  
  		MarkBufferDirty(buf);
  
--- 752,760 ----
  		/* Do the update.  No ereport(ERROR) until changes are logged */
  		START_CRIT_SECTION();
  
! 		if (!_bt_pgaddtup(page, itemsz, itup, newitemoff))
! 			elog(PANIC, "failed to add new item to block %u in index \"%s\"",
! 				 itup_blkno, RelationGetRelationName(rel));
  
  		MarkBufferDirty(buf);
  
***************
*** 879,884 ****
--- 880,887 ----
  	Page		origpage;
  	Page		leftpage,
  				rightpage;
+ 	BlockNumber origpagenumber,
+ 				rightpagenumber;
  	BTPageOpaque ropaque,
  				lopaque,
  				oopaque;
***************
*** 894,904 ****
--- 897,923 ----
  	OffsetNumber i;
  	bool		isroot;
  
+ 	/* Acquire a new page to split into */
  	rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
+ 
+ 	/*
+ 	 * origpage is the original page to be split.  leftpage is a temporary
+ 	 * buffer that receives the left-sibling data, which will be copied back
+ 	 * into origpage on success.  rightpage is the new page that receives
+ 	 * the right-sibling data.  If we fail before reaching the critical
+ 	 * section, origpage hasn't been modified and leftpage is only workspace.
+ 	 * In principle we shouldn't need to worry about rightpage either,
+ 	 * because it hasn't been linked into the btree page structure; but to
+ 	 * avoid leaving possibly-confusing junk behind, we are careful to reinit
+ 	 * rightpage to empty before throwing any error.
+ 	 */
  	origpage = BufferGetPage(buf);
  	leftpage = PageGetTempPage(origpage);
  	rightpage = BufferGetPage(rbuf);
  
+ 	origpagenumber = BufferGetBlockNumber(buf);
+ 	rightpagenumber = BufferGetBlockNumber(rbuf);
+ 
  	_bt_pageinit(leftpage, BufferGetPageSize(buf));
  	/* rightpage was already initialized by _bt_getbuf */
  
***************
*** 923,930 ****
  	lopaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE);
  	ropaque->btpo_flags = lopaque->btpo_flags;
  	lopaque->btpo_prev = oopaque->btpo_prev;
! 	lopaque->btpo_next = BufferGetBlockNumber(rbuf);
! 	ropaque->btpo_prev = BufferGetBlockNumber(buf);
  	ropaque->btpo_next = oopaque->btpo_next;
  	lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level;
  	/* Since we already have write-lock on both pages, ok to read cycleid */
--- 942,949 ----
  	lopaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE);
  	ropaque->btpo_flags = lopaque->btpo_flags;
  	lopaque->btpo_prev = oopaque->btpo_prev;
! 	lopaque->btpo_next = rightpagenumber;
! 	ropaque->btpo_prev = origpagenumber;
  	ropaque->btpo_next = oopaque->btpo_next;
  	lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level;
  	/* Since we already have write-lock on both pages, ok to read cycleid */
***************
*** 947,955 ****
  		item = (IndexTuple) PageGetItem(origpage, itemid);
  		if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
  						false, false) == InvalidOffsetNumber)
! 			elog(PANIC, "failed to add hikey to the right sibling"
  				 " while splitting block %u of index \"%s\"",
! 				 BufferGetBlockNumber(buf), RelationGetRelationName(rel));
  		rightoff = OffsetNumberNext(rightoff);
  	}
  
--- 966,977 ----
  		item = (IndexTuple) PageGetItem(origpage, itemid);
  		if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
  						false, false) == InvalidOffsetNumber)
! 		{
! 			_bt_pageinit(rightpage, BufferGetPageSize(rbuf));
! 			elog(ERROR, "failed to add hikey to the right sibling"
  				 " while splitting block %u of index \"%s\"",
! 				 origpagenumber, RelationGetRelationName(rel));
! 		}
  		rightoff = OffsetNumberNext(rightoff);
  	}
  
***************
*** 974,982 ****
  	}
  	if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
  					false, false) == InvalidOffsetNumber)
! 		elog(PANIC, "failed to add hikey to the left sibling"
  			 " while splitting block %u of index \"%s\"",
! 			 BufferGetBlockNumber(buf), RelationGetRelationName(rel));
  	leftoff = OffsetNumberNext(leftoff);
  
  	/*
--- 996,1007 ----
  	}
  	if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
  					false, false) == InvalidOffsetNumber)
! 	{
! 		_bt_pageinit(rightpage, BufferGetPageSize(rbuf));
! 		elog(ERROR, "failed to add hikey to the left sibling"
  			 " while splitting block %u of index \"%s\"",
! 			 origpagenumber, RelationGetRelationName(rel));
! 	}
  	leftoff = OffsetNumberNext(leftoff);
  
  	/*
***************
*** 998,1011 ****
  		{
  			if (newitemonleft)
  			{
! 				_bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
! 							 "left sibling");
  				leftoff = OffsetNumberNext(leftoff);
  			}
  			else
  			{
! 				_bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
! 							 "right sibling");
  				rightoff = OffsetNumberNext(rightoff);
  			}
  		}
--- 1023,1046 ----
  		{
  			if (newitemonleft)
  			{
! 				if (!_bt_pgaddtup(leftpage, newitemsz, newitem, leftoff))
! 				{
! 					_bt_pageinit(rightpage, BufferGetPageSize(rbuf));
! 					elog(ERROR, "failed to add new item to the left sibling"
! 						 " while splitting block %u of index \"%s\"",
! 						 origpagenumber, RelationGetRelationName(rel));
! 				}
  				leftoff = OffsetNumberNext(leftoff);
  			}
  			else
  			{
! 				if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
! 				{
! 					_bt_pageinit(rightpage, BufferGetPageSize(rbuf));
! 					elog(ERROR, "failed to add new item to the right sibling"
! 						 " while splitting block %u of index \"%s\"",
! 						 origpagenumber, RelationGetRelationName(rel));
! 				}
  				rightoff = OffsetNumberNext(rightoff);
  			}
  		}
***************
*** 1013,1026 ****
  		/* decide which page to put it on */
  		if (i < firstright)
  		{
! 			_bt_pgaddtup(rel, leftpage, itemsz, item, leftoff,
! 						 "left sibling");
  			leftoff = OffsetNumberNext(leftoff);
  		}
  		else
  		{
! 			_bt_pgaddtup(rel, rightpage, itemsz, item, rightoff,
! 						 "right sibling");
  			rightoff = OffsetNumberNext(rightoff);
  		}
  	}
--- 1048,1071 ----
  		/* decide which page to put it on */
  		if (i < firstright)
  		{
! 			if (!_bt_pgaddtup(leftpage, itemsz, item, leftoff))
! 			{
! 				_bt_pageinit(rightpage, BufferGetPageSize(rbuf));
! 				elog(ERROR, "failed to add old item to the left sibling"
! 					 " while splitting block %u of index \"%s\"",
! 					 origpagenumber, RelationGetRelationName(rel));
! 			}
  			leftoff = OffsetNumberNext(leftoff);
  		}
  		else
  		{
! 			if (!_bt_pgaddtup(rightpage, itemsz, item, rightoff))
! 			{
! 				_bt_pageinit(rightpage, BufferGetPageSize(rbuf));
! 				elog(ERROR, "failed to add old item to the right sibling"
! 					 " while splitting block %u of index \"%s\"",
! 					 origpagenumber, RelationGetRelationName(rel));
! 			}
  			rightoff = OffsetNumberNext(rightoff);
  		}
  	}
***************
*** 1034,1041 ****
  		 * not be splitting the page).
  		 */
  		Assert(!newitemonleft);
! 		_bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
! 					 "right sibling");
  		rightoff = OffsetNumberNext(rightoff);
  	}
  
--- 1079,1091 ----
  		 * not be splitting the page).
  		 */
  		Assert(!newitemonleft);
! 		if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
! 		{
! 			_bt_pageinit(rightpage, BufferGetPageSize(rbuf));
! 			elog(ERROR, "failed to add new item to the right sibling"
! 				 " while splitting block %u of index \"%s\"",
! 				 origpagenumber, RelationGetRelationName(rel));
! 		}
  		rightoff = OffsetNumberNext(rightoff);
  	}
  
***************
*** 1047,1062 ****
  	 * neighbors.
  	 */
  
! 	if (!P_RIGHTMOST(ropaque))
  	{
! 		sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE);
  		spage = BufferGetPage(sbuf);
  		sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
! 		if (sopaque->btpo_prev != ropaque->btpo_prev)
! 			elog(PANIC, "right sibling's left-link doesn't match: "
! 			   "block %u links to %u instead of expected %u in index \"%s\"",
! 				 ropaque->btpo_next, sopaque->btpo_prev, ropaque->btpo_prev,
  				 RelationGetRelationName(rel));
  
  		/*
  		 * Check to see if we can set the SPLIT_END flag in the right-hand
--- 1097,1115 ----
  	 * neighbors.
  	 */
  
! 	if (!P_RIGHTMOST(oopaque))
  	{
! 		sbuf = _bt_getbuf(rel, oopaque->btpo_next, BT_WRITE);
  		spage = BufferGetPage(sbuf);
  		sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
! 		if (sopaque->btpo_prev != origpagenumber)
! 		{
! 			_bt_pageinit(rightpage, BufferGetPageSize(rbuf));
! 			elog(ERROR, "right sibling's left-link doesn't match: "
! 				 "block %u links to %u instead of expected %u in index \"%s\"",
! 				 oopaque->btpo_next, sopaque->btpo_prev, origpagenumber,
  				 RelationGetRelationName(rel));
+ 		}
  
  		/*
  		 * Check to see if we can set the SPLIT_END flag in the right-hand
***************
*** 1081,1088 ****
  	 *
  	 * NO EREPORT(ERROR) till right sibling is updated.  We can get away with
  	 * not starting the critical section till here because we haven't been
! 	 * scribbling on the original page yet, and we don't care about the new
! 	 * sibling until it's linked into the btree.
  	 */
  	START_CRIT_SECTION();
  
--- 1134,1140 ----
  	 *
  	 * NO EREPORT(ERROR) till right sibling is updated.  We can get away with
  	 * not starting the critical section till here because we haven't been
! 	 * scribbling on the original page yet; see comments above.
  	 */
  	START_CRIT_SECTION();
  
***************
*** 1094,1112 ****
  	 * (in the page management code) that the center of a page always be
  	 * clean, and the most efficient way to guarantee this is just to compact
  	 * the data by reinserting it into a new left page.  (XXX the latter
! 	 * comment is probably obsolete.)
  	 *
  	 * We need to do this before writing the WAL record, so that XLogInsert
  	 * can WAL log an image of the page if necessary.
  	 */
  	PageRestoreTempPage(leftpage, origpage);
  
  	MarkBufferDirty(buf);
  	MarkBufferDirty(rbuf);
  
  	if (!P_RIGHTMOST(ropaque))
  	{
! 		sopaque->btpo_prev = BufferGetBlockNumber(rbuf);
  		MarkBufferDirty(sbuf);
  	}
  
--- 1146,1166 ----
  	 * (in the page management code) that the center of a page always be
  	 * clean, and the most efficient way to guarantee this is just to compact
  	 * the data by reinserting it into a new left page.  (XXX the latter
! 	 * comment is probably obsolete; but in any case it's good to not scribble
! 	 * on the original page until we enter the critical section.)
  	 *
  	 * We need to do this before writing the WAL record, so that XLogInsert
  	 * can WAL log an image of the page if necessary.
  	 */
  	PageRestoreTempPage(leftpage, origpage);
+ 	/* leftpage, lopaque must not be used below here */
  
  	MarkBufferDirty(buf);
  	MarkBufferDirty(rbuf);
  
  	if (!P_RIGHTMOST(ropaque))
  	{
! 		sopaque->btpo_prev = rightpagenumber;
  		MarkBufferDirty(sbuf);
  	}
  
***************
*** 1120,1127 ****
  		XLogRecData *lastrdata;
  
  		xlrec.node = rel->rd_node;
! 		xlrec.leftsib = BufferGetBlockNumber(buf);
! 		xlrec.rightsib = BufferGetBlockNumber(rbuf);
  		xlrec.rnext = ropaque->btpo_next;
  		xlrec.level = ropaque->btpo.level;
  		xlrec.firstright = firstright;
--- 1174,1181 ----
  		XLogRecData *lastrdata;
  
  		xlrec.node = rel->rd_node;
! 		xlrec.leftsib = origpagenumber;
! 		xlrec.rightsib = rightpagenumber;
  		xlrec.rnext = ropaque->btpo_next;
  		xlrec.level = ropaque->btpo.level;
  		xlrec.firstright = firstright;
***************
*** 1920,1932 ****
   *		we insert the tuples in order, so that the given itup_off does
   *		represent the final position of the tuple!
   */
! static void
! _bt_pgaddtup(Relation rel,
! 			 Page page,
  			 Size itemsize,
  			 IndexTuple itup,
! 			 OffsetNumber itup_off,
! 			 const char *where)
  {
  	BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
  	IndexTupleData trunctuple;
--- 1974,1984 ----
   *		we insert the tuples in order, so that the given itup_off does
   *		represent the final position of the tuple!
   */
! static bool
! _bt_pgaddtup(Page page,
  			 Size itemsize,
  			 IndexTuple itup,
! 			 OffsetNumber itup_off)
  {
  	BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
  	IndexTupleData trunctuple;
***************
*** 1941,1948 ****
  
  	if (PageAddItem(page, (Item) itup, itemsize, itup_off,
  					false, false) == InvalidOffsetNumber)
! 		elog(PANIC, "failed to add item to the %s in index \"%s\"",
! 			 where, RelationGetRelationName(rel));
  }
  
  /*
--- 1993,2001 ----
  
  	if (PageAddItem(page, (Item) itup, itemsize, itup_off,
  					false, false) == InvalidOffsetNumber)
! 		return false;
! 
! 	return true;
  }
  
  /*
Index: src/backend/access/nbtree/nbtpage.c
===================================================================
RCS file: /cvsroot/pgsql/src/backend/access/nbtree/nbtpage.c,v
retrieving revision 1.123
diff -c -r1.123 nbtpage.c
*** src/backend/access/nbtree/nbtpage.c	6 Jul 2010 19:18:55 -0000	1.123
--- src/backend/access/nbtree/nbtpage.c	28 Aug 2010 23:31:44 -0000
***************
*** 1253,1258 ****
--- 1253,1290 ----
  	}
  
  	/*
+ 	 * Check to make sure the index items we're about to delete/overwrite
+ 	 * contain what we expect.  This can fail if the index has become
+ 	 * corrupt for some reason.  We want to throw any error before entering
+ 	 * the critical section --- otherwise it'd be a PANIC.
+ 	 *
+ 	 * The test on the target item is just an Assert because _bt_getstackbuf
+ 	 * should have guaranteed it has the expected contents.  The test on the
+ 	 * next-child downlink is known to fail in the field, though.
+ 	 */
+ 	page = BufferGetPage(pbuf);
+ 	opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+ 
+ #ifdef USE_ASSERT_CHECKING
+ 	itemid = PageGetItemId(page, poffset);
+ 	itup = (IndexTuple) PageGetItem(page, itemid);
+ 	Assert(ItemPointerGetBlockNumber(&(itup->t_tid)) == target);
+ #endif
+ 
+ 	if (!parent_half_dead)
+ 	{
+ 		OffsetNumber nextoffset;
+ 
+ 		nextoffset = OffsetNumberNext(poffset);
+ 		itemid = PageGetItemId(page, nextoffset);
+ 		itup = (IndexTuple) PageGetItem(page, itemid);
+ 		if (ItemPointerGetBlockNumber(&(itup->t_tid)) != rightsib)
+ 			elog(ERROR, "right sibling %u of block %u is not next child %u of block %u in index \"%s\"",
+ 				 rightsib, target, ItemPointerGetBlockNumber(&(itup->t_tid)),
+ 				 parent, RelationGetRelationName(rel));
+ 	}
+ 
+ 	/*
  	 * Here we begin doing the deletion.
  	 */
  
***************
*** 1265,1272 ****
  	 * to copy the right sibling's downlink over the target downlink, and then
  	 * delete the following item.
  	 */
- 	page = BufferGetPage(pbuf);
- 	opaque = (BTPageOpaque) PageGetSpecialPointer(page);
  	if (parent_half_dead)
  	{
  		PageIndexTupleDelete(page, poffset);
--- 1297,1302 ----
***************
*** 1278,1294 ****
  
  		itemid = PageGetItemId(page, poffset);
  		itup = (IndexTuple) PageGetItem(page, itemid);
- 		Assert(ItemPointerGetBlockNumber(&(itup->t_tid)) == target);
  		ItemPointerSet(&(itup->t_tid), rightsib, P_HIKEY);
  
  		nextoffset = OffsetNumberNext(poffset);
- 		/* This part is just for double-checking */
- 		itemid = PageGetItemId(page, nextoffset);
- 		itup = (IndexTuple) PageGetItem(page, itemid);
- 		if (ItemPointerGetBlockNumber(&(itup->t_tid)) != rightsib)
- 			elog(PANIC, "right sibling %u of block %u is not next child of %u in index \"%s\"",
- 				 rightsib, target, BufferGetBlockNumber(pbuf),
- 				 RelationGetRelationName(rel));
  		PageIndexTupleDelete(page, nextoffset);
  	}
  
--- 1308,1316 ----
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to