Tom Lane wrote:
Heikki Linnakangas <[EMAIL PROTECTED]> writes:
Tom Lane wrote:
As far as the ugliness in XLogRecordPageWithFreeSpace goes: couldn't you
just call XLogReadBufferWithFork with init = true, and then initialize
the page if PageIsNew?

With init=true, we don't even try to read the page from the disk (since 8.3), so all FSM pages accessed during recovery would be zeroed out. I don't think that's what you intended.

Ah, right.  Maybe the API change you suggested in the comment is the
way to go.

Done, patch attached. But while I was hacking that, I realized another problem:

Because changes to FSM pages are not WAL-logged, they can be "torn" if at crash, one part of the page is flushed to disk, but another is not. The FSM code will recover from internally inconsistent pages, caused by torn pages or other errors, but we still have a problem if the FSM file is extended, and the new page is torn. It can happen that the first part of the page, containing the page header, doesn't make it to disk, but other parts of the page do. ReadBuffer() checks that the page header is valid, so it will throw an error on a torn page like that. ReadBuffer() doesn't complain about a page that is all-zeros, but it's not in this scenario.

The FSM would be perfectly happy to just initialize torn or otherwise damaged pages, so I think we should add yet another mode to ReadBuffer() to allow that. We could also treat read() errors as merely warnings in that mode, effectively the same as with zero_damaged_pages=on.

The ReadBuffer() interface is already pretty complex, with all the different variants. We should probably keep the good old ReadBuffer() the same, for the sake of simplicity in the callers, but try to reduce the number of other variatns.

The current API is this:

Buffer ReadBuffer(Relation reln, BlockNumber blockNum);
Buffer ReadBufferWithFork(Relation reln, ForkNumber forkNum, BlockNumber blockNum); Buffer ReadBufferWithStrategy(Relation reln, BlockNumber blockNum, BufferAccessStrategy strategy); Buffer ReadOrZeroBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum); Buffer ReadBufferWithoutRelcache(RelFileNode rnode, bool isTemp, ForkNumber forkNum, BlockNumber blockNum, bool zeroPage);

Here's my proposal for new API:

typedef enum
{
  RBM_NORMAL,           /* checks header, ereport(ERROR) on errors */
RBM_INIT, /* just allocate a buffer, don't read from disk. Caller must initialize the page */ RBM_INIT_ON_ERROR /* read, but instead of ERRORing, return an all-zeros page */
} ReadBufferMode;

Buffer ReadBuffer(Relation reln, BlockNumber blockNum);
Buffer ReadBufferExt(Relation reln, ForkNumber forkNum, BlockNumber blockNum, BufferAccessStrategy strategy, ReadBufferMode mode); Buffer ReadBufferWithoutRelcache(RelFileNode rnode, bool isTemp, ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode);

Thoughts?

--
  Heikki Linnakangas
  EnterpriseDB   http://www.enterprisedb.com
*** src/backend/access/heap/heapam.c
--- src/backend/access/heap/heapam.c
***************
*** 54,59 ****
--- 54,60 ----
  #include "miscadmin.h"
  #include "pgstat.h"
  #include "storage/bufmgr.h"
+ #include "storage/freespace.h"
  #include "storage/lmgr.h"
  #include "storage/procarray.h"
  #include "storage/smgr.h"
***************
*** 4029,4034 **** heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record, bool clean_move)
--- 4030,4036 ----
  	int			nredirected;
  	int			ndead;
  	int			nunused;
+ 	Size		freespace;
  
  	if (record->xl_info & XLR_BKP_BLOCK_1)
  		return;
***************
*** 4060,4065 **** heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record, bool clean_move)
--- 4062,4069 ----
  							nowunused, nunused,
  							clean_move);
  
+ 	freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
+ 
  	/*
  	 * Note: we don't worry about updating the page's prunability hints.
  	 * At worst this will cause an extra prune cycle to occur soon.
***************
*** 4069,4074 **** heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record, bool clean_move)
--- 4073,4087 ----
  	PageSetTLI(page, ThisTimeLineID);
  	MarkBufferDirty(buffer);
  	UnlockReleaseBuffer(buffer);
+ 
+ 	/*
+ 	 * Update the FSM as well.
+ 	 *
+ 	 * XXX: We don't get here if the page was restored from full page image.
+ 	 * We don't bother to update the FSM in that case, it doesn't need to be
+ 	 * totally accurate anyway.
+ 	 */
+ 	XLogRecordPageWithFreeSpace(xlrec->node, xlrec->block, freespace);
  }
  
  static void
***************
*** 4212,4226 **** heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
  	HeapTupleHeader htup;
  	xl_heap_header xlhdr;
  	uint32		newlen;
  
  	if (record->xl_info & XLR_BKP_BLOCK_1)
  		return;
  
  	if (record->xl_info & XLOG_HEAP_INIT_PAGE)
  	{
! 		buffer = XLogReadBuffer(xlrec->target.node,
! 							 ItemPointerGetBlockNumber(&(xlrec->target.tid)),
! 								true);
  		Assert(BufferIsValid(buffer));
  		page = (Page) BufferGetPage(buffer);
  
--- 4225,4241 ----
  	HeapTupleHeader htup;
  	xl_heap_header xlhdr;
  	uint32		newlen;
+ 	Size		freespace;
+ 	BlockNumber	blkno;
  
  	if (record->xl_info & XLR_BKP_BLOCK_1)
  		return;
  
+ 	blkno = ItemPointerGetBlockNumber(&(xlrec->target.tid));
+ 
  	if (record->xl_info & XLOG_HEAP_INIT_PAGE)
  	{
! 		buffer = XLogReadBuffer(xlrec->target.node, blkno, true);
  		Assert(BufferIsValid(buffer));
  		page = (Page) BufferGetPage(buffer);
  
***************
*** 4228,4236 **** heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
  	}
  	else
  	{
! 		buffer = XLogReadBuffer(xlrec->target.node,
! 							 ItemPointerGetBlockNumber(&(xlrec->target.tid)),
! 								false);
  		if (!BufferIsValid(buffer))
  			return;
  		page = (Page) BufferGetPage(buffer);
--- 4243,4249 ----
  	}
  	else
  	{
! 		buffer = XLogReadBuffer(xlrec->target.node, blkno, false);
  		if (!BufferIsValid(buffer))
  			return;
  		page = (Page) BufferGetPage(buffer);
***************
*** 4268,4277 **** heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
--- 4281,4305 ----
  	offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
  	if (offnum == InvalidOffsetNumber)
  		elog(PANIC, "heap_insert_redo: failed to add tuple");
+ 
+ 	freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
+ 
  	PageSetLSN(page, lsn);
  	PageSetTLI(page, ThisTimeLineID);
  	MarkBufferDirty(buffer);
  	UnlockReleaseBuffer(buffer);
+ 
+ 	/*
+ 	 * If the page is running low on free space, update the FSM as well.
+ 	 * Arbitrarily, our definition of "low" is less than 20%. We can't do
+ 	 * much better than that without knowing the fill-factor for the table.
+ 	 *
+ 	 * XXX: We don't get here if the page was restored from full page image.
+ 	 * We don't bother to update the FSM in that case, it doesn't need to be
+ 	 * totally accurate anyway.
+ 	 */
+ 	if (freespace < BLCKSZ / 5)
+ 		XLogRecordPageWithFreeSpace(xlrec->target.node, blkno, freespace);
  }
  
  /*
***************
*** 4296,4301 **** heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update)
--- 4324,4330 ----
  	xl_heap_header xlhdr;
  	int			hsize;
  	uint32		newlen;
+ 	Size		freespace;
  
  	if (record->xl_info & XLR_BKP_BLOCK_1)
  	{
***************
*** 4453,4462 **** newsame:;
--- 4482,4513 ----
  	offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
  	if (offnum == InvalidOffsetNumber)
  		elog(PANIC, "heap_update_redo: failed to add tuple");
+ 
+ 	freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
+ 
  	PageSetLSN(page, lsn);
  	PageSetTLI(page, ThisTimeLineID);
  	MarkBufferDirty(buffer);
  	UnlockReleaseBuffer(buffer);
+ 
+ 	/*
+ 	 * If the page is running low on free space, update the FSM as well.
+ 	 * Arbitrarily, our definition of "low" is less than 20%. We can't do
+ 	 * much better than that without knowing the fill-factor for the table.
+ 	 *
+ 	 * However, don't update the FSM on HOT updates, because after crash
+ 	 * recovery, either the old or the new tuple will certainly be dead and
+ 	 * prunable. After pruning, the page will have roughly as much free space
+ 	 * as it did before the update, assuming the new tuple is about the same
+ 	 * size as the old one.
+ 	 *
+ 	 * XXX: We don't get here if the page was restored from full page image.
+ 	 * We don't bother to update the FSM in that case, it doesn't need to be
+ 	 * totally accurate anyway.
+ 	 */
+ 	if (!hot_update && freespace < BLCKSZ / 5)
+ 		XLogRecordPageWithFreeSpace(xlrec->target.node,
+ 					ItemPointerGetBlockNumber(&(xlrec->newtid)), freespace);
  }
  
  static void
*** src/backend/access/transam/xlog.c
--- src/backend/access/transam/xlog.c
***************
*** 2898,2904 **** RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
  		blk += sizeof(BkpBlock);
  
  		buffer = XLogReadBufferWithFork(bkpb.node, bkpb.fork, bkpb.block,
! 										true);
  		Assert(BufferIsValid(buffer));
  		page = (Page) BufferGetPage(buffer);
  
--- 2898,2904 ----
  		blk += sizeof(BkpBlock);
  
  		buffer = XLogReadBufferWithFork(bkpb.node, bkpb.fork, bkpb.block,
! 										XLRBB_ZERO);
  		Assert(BufferIsValid(buffer));
  		page = (Page) BufferGetPage(buffer);
  
*** src/backend/access/transam/xlogutils.c
--- src/backend/access/transam/xlogutils.c
***************
*** 228,234 **** XLogCheckInvalidPages(void)
  Buffer
  XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init)
  {
! 	return XLogReadBufferWithFork(rnode, MAIN_FORKNUM, blkno, init);
  }
  
  /*
--- 228,235 ----
  Buffer
  XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init)
  {
! 	return XLogReadBufferWithFork(rnode, MAIN_FORKNUM, blkno,
! 								  init ? XLRBB_ZERO : XLRBB_EXISTS);
  }
  
  /*
***************
*** 238,244 **** XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init)
   */
  Buffer
  XLogReadBufferWithFork(RelFileNode rnode, ForkNumber forknum,
! 					   BlockNumber blkno, bool init)
  {
  	BlockNumber lastblock;
  	Buffer		buffer;
--- 239,245 ----
   */
  Buffer
  XLogReadBufferWithFork(RelFileNode rnode, ForkNumber forknum,
! 					   BlockNumber blkno, XLogReadBufferBehavior behavior)
  {
  	BlockNumber lastblock;
  	Buffer		buffer;
***************
*** 264,275 **** XLogReadBufferWithFork(RelFileNode rnode, ForkNumber forknum,
  	if (blkno < lastblock)
  	{
  		/* page exists in file */
! 		buffer = ReadBufferWithoutRelcache(rnode, false, forknum, blkno, init);
  	}
  	else
  	{
  		/* hm, page doesn't exist in file */
! 		if (!init)
  		{
  			log_invalid_page(rnode, forknum, blkno, false);
  			return InvalidBuffer;
--- 265,277 ----
  	if (blkno < lastblock)
  	{
  		/* page exists in file */
! 		buffer = ReadBufferWithoutRelcache(rnode, false, forknum, blkno,
! 										   behavior == XLRBB_ZERO);
  	}
  	else
  	{
  		/* hm, page doesn't exist in file */
! 		if (behavior == XLRBB_EXISTS)
  		{
  			log_invalid_page(rnode, forknum, blkno, false);
  			return InvalidBuffer;
***************
*** 291,297 **** XLogReadBufferWithFork(RelFileNode rnode, ForkNumber forknum,
  
  	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
  
! 	if (!init)
  	{
  		/* check that page has been initialized */
  		Page		page = (Page) BufferGetPage(buffer);
--- 293,299 ----
  
  	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
  
! 	if (behavior == XLRBB_EXISTS)
  	{
  		/* check that page has been initialized */
  		Page		page = (Page) BufferGetPage(buffer);
*** src/backend/storage/freespace/freespace.c
--- src/backend/storage/freespace/freespace.c
***************
*** 203,208 **** RecordPageWithFreeSpace(Relation rel, BlockNumber heapBlk, Size spaceAvail)
--- 203,238 ----
  }
  
  /*
+  * XLogRecordPageWithFreeSpace - like RecordPageWithFreeSpace, for use in
+  *		WAL replay
+  */
+ void
+ XLogRecordPageWithFreeSpace(RelFileNode rnode, BlockNumber heapBlk,
+ 							Size spaceAvail)
+ {
+ 	int			new_cat = fsm_space_avail_to_cat(spaceAvail);
+ 	FSMAddress	addr;
+ 	uint16		slot;
+ 	BlockNumber blkno;
+ 	Buffer		buf;
+ 	Page		page;
+ 
+ 	/* Get the location of the FSM byte representing the heap block */
+ 	addr = fsm_get_location(heapBlk, &slot);
+ 	blkno = fsm_logical_to_physical(addr);
+ 
+ 	/* If the page doesn't exist already, extend */
+ 	buf = XLogReadBufferWithFork(rnode, FSM_FORKNUM, blkno, XLRBB_CAN_EXTEND);
+ 	page = BufferGetPage(buf);
+ 	if (PageIsNew(page))
+ 		PageInit(page, BLCKSZ, 0);
+ 
+ 	if (fsm_set_avail(page, slot, new_cat))
+ 		MarkBufferDirty(buf);
+ 	UnlockReleaseBuffer(buf);
+ }
+ 
+ /*
   * GetRecordedFreePage - return the amount of free space on a particular page,
   *		according to the FSM.
   */
***************
*** 504,509 **** static Buffer
--- 534,540 ----
  fsm_readbuf(Relation rel, FSMAddress addr, bool extend)
  {
  	BlockNumber blkno = fsm_logical_to_physical(addr);
+ 	Buffer buf;
  
  	RelationOpenSmgr(rel);
  
***************
*** 518,524 **** fsm_readbuf(Relation rel, FSMAddress addr, bool extend)
  		else
  			return InvalidBuffer;
  	}
! 	return ReadBufferWithFork(rel, FSM_FORKNUM, blkno);
  }
  
  /*
--- 549,564 ----
  		else
  			return InvalidBuffer;
  	}
! 	buf = ReadBufferWithFork(rel, FSM_FORKNUM, blkno);
! 
! 	/*
! 	 * An all-zeroes page could be left over if a backend extends the
! 	 * relation but crashes before initializing the page.
! 	 */
! 	if (PageIsNew(BufferGetPage(buf)))
! 		PageInit(BufferGetPage(buf), BLCKSZ, 0);
! 
! 	return buf;
  }
  
  /*
***************
*** 768,773 **** fsm_redo_truncate(xl_fsm_truncate *xlrec)
--- 808,814 ----
  	uint16		first_removed_slot;
  	BlockNumber fsmblk;
  	Buffer		buf;
+ 	Page		page;
  
  	/* Get the location in the FSM of the first removed heap block */
  	first_removed_address = fsm_get_location(xlrec->nheapblocks,
***************
*** 779,801 **** fsm_redo_truncate(xl_fsm_truncate *xlrec)
  	 * replay of the smgr truncation record to remove completely unused
  	 * pages.
  	 */
! 	buf = XLogReadBufferWithFork(xlrec->node, FSM_FORKNUM, fsmblk, false);
! 	if (BufferIsValid(buf))
! 	{
! 		fsm_truncate_avail(BufferGetPage(buf), first_removed_slot);
! 		MarkBufferDirty(buf);
! 		UnlockReleaseBuffer(buf);
! 	}
! 	else
! 	{
! 		/*
! 		 * The page doesn't exist. Because FSM extensions are not WAL-logged,
! 		 * it's normal to have a truncation record for a page that doesn't
! 		 * exist. Tell xlogutils.c not to PANIC at the end of recovery
! 		 * because of the missing page
! 		 */
! 		XLogTruncateRelation(xlrec->node, FSM_FORKNUM, fsmblk);
! 	}
  }
  
  void
--- 820,834 ----
  	 * replay of the smgr truncation record to remove completely unused
  	 * pages.
  	 */
! 	buf = XLogReadBufferWithFork(xlrec->node, FSM_FORKNUM, fsmblk,
! 								 XLRBB_CAN_EXTEND);
! 	page = BufferGetPage(buf);
! 	if (PageIsNew(page))
! 		PageInit(page, BLCKSZ, 0);
! 
! 	fsm_truncate_avail(page, first_removed_slot);
! 	MarkBufferDirty(buf);
! 	UnlockReleaseBuffer(buf);
  }
  
  void
*** src/include/access/xlogutils.h
--- src/include/access/xlogutils.h
***************
*** 24,32 **** extern void XLogDropDatabase(Oid dbid);
  extern void XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
  								 BlockNumber nblocks);
  
  extern Buffer XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init);
  extern Buffer XLogReadBufferWithFork(RelFileNode rnode, ForkNumber forknum,
! 									 BlockNumber blkno, bool init);
  
  extern Relation CreateFakeRelcacheEntry(RelFileNode rnode);
  extern void FreeFakeRelcacheEntry(Relation fakerel);
--- 24,44 ----
  extern void XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
  								 BlockNumber nblocks);
  
+ /*
+  * XLogReadBuffer() behavior in case the requested page doesn't exist.
+  */
+ typedef enum XLogReadBufferBehavior
+ {
+ 	XLRBB_EXISTS,		/* complain if page doesn't exist */
+ 	XLRBB_ZERO,			/* never read, just allocate a buffer, the caller
+ 						 * will initialize the page */
+ 	XLRBB_CAN_EXTEND	/* extend relation if page doesn't exist */
+ } XLogReadBufferBehavior;
+ 
  extern Buffer XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init);
  extern Buffer XLogReadBufferWithFork(RelFileNode rnode, ForkNumber forknum,
! 									 BlockNumber blkno,
! 									 XLogReadBufferBehavior behavior);
  
  extern Relation CreateFakeRelcacheEntry(RelFileNode rnode);
  extern void FreeFakeRelcacheEntry(Relation fakerel);
*** src/include/storage/freespace.h
--- src/include/storage/freespace.h
***************
*** 27,32 **** extern BlockNumber RecordAndGetPageWithFreeSpace(Relation rel,
--- 27,34 ----
  							  Size spaceNeeded);
  extern void RecordPageWithFreeSpace(Relation rel, BlockNumber heapBlk,
  									Size spaceAvail);
+ extern void XLogRecordPageWithFreeSpace(RelFileNode rnode, BlockNumber heapBlk,
+ 										Size spaceAvail);
  
  extern void FreeSpaceMapTruncateRel(Relation rel, BlockNumber nblocks);
  extern void FreeSpaceMapVacuum(Relation rel);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to