OK, here's an updated version...

1. Use IncrBufferRefCount() so that we can do unconditional
ReleaseBuffers elsewhere.  I'm not sure this is really any simpler,
and although IncrBufferRefCount() is pretty cheap, it's certainly not
as cheap as a NULL pointer test.

2. Consolidate a bunch of logic into a new function
RelationReadBuffer.  This simpifies the logic in
RelationGetBufferForTuple() considerably.

3. Make RelationGetBufferForTuple ignore relation->rd_block in favor
of bistate->last_pin whenever possible.  Changing this to also not
bother setting relation->rd_block didn't seem worthwhile, so I didn't.

...Robert

On Tue, Nov 4, 2008 at 4:18 PM, Tom Lane <[EMAIL PROTECTED]> wrote:
> "Robert Haas" <[EMAIL PROTECTED]> writes:
>>> 2. The logic changes in RelationGetBufferForTuple seem bizarre and
>>> overcomplicated.  ISTM that the buffer saved by the bistate ought to
>>> be about equivalent to relation->rd_targblock, ie, it's your first
>>> trial location and also a place to save the located buffer on the way
>>> out.  I'd suggest tossing that part of the patch and starting over.
>
>> Hmm, would that be safe in the presence of concurrent or recursive
>> bulk inserts into the same relation?
>
> As safe as it is now --- you're relying on the bistate to carry the
> query-local state.  Probably the best design is to just ignore
> rd_targblock when a bistate is provided, and use the bistate's buffer
> instead.
>
>                        regards, tom lane
>
Index: src/backend/access/heap/heapam.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/access/heap/heapam.c,v
retrieving revision 1.268
diff -c -r1.268 heapam.c
*** src/backend/access/heap/heapam.c	31 Oct 2008 19:40:26 -0000	1.268
--- src/backend/access/heap/heapam.c	6 Nov 2008 03:25:19 -0000
***************
*** 1798,1803 ****
--- 1798,1827 ----
  	}
  }
  
+ /*
+  * GetBulkInsertState - set up for a bulk insert
+  */
+ BulkInsertState
+ GetBulkInsertState(void)
+ {
+ 	BulkInsertState bistate;
+ 
+ 	bistate = palloc(sizeof(struct BulkInsertStateData));
+ 	bistate->strategy = GetAccessStrategy(BAS_BULKWRITE);
+ 	bistate->last_pin = InvalidBuffer;
+ 	return bistate;
+ }
+ 
+ /*
+  * FreeBulkInsertState - clean up after finishing a bulk insert
+  */
+ void
+ FreeBulkInsertState(BulkInsertState bistate)
+ {
+ 	if (bistate->last_pin != InvalidBuffer)
+ 		ReleaseBuffer(bistate->last_pin);		
+ 	FreeAccessStrategy(bistate->strategy);
+ }
  
  /*
   *	heap_insert		- insert tuple into a heap
***************
*** 1805,1821 ****
   * The new tuple is stamped with current transaction ID and the specified
   * command ID.
   *
!  * If use_wal is false, the new tuple is not logged in WAL, even for a
!  * non-temp relation.  Safe usage of this behavior requires that we arrange
!  * that all new tuples go into new pages not containing any tuples from other
!  * transactions, and that the relation gets fsync'd before commit.
   * (See also heap_sync() comments)
   *
!  * use_fsm is passed directly to RelationGetBufferForTuple, which see for
!  * more info.
   *
!  * Note that use_wal and use_fsm will be applied when inserting into the
!  * heap's TOAST table, too, if the tuple requires any out-of-line data.
   *
   * The return value is the OID assigned to the tuple (either here or by the
   * caller), or InvalidOid if no OID.  The header fields of *tup are updated
--- 1829,1846 ----
   * The new tuple is stamped with current transaction ID and the specified
   * command ID.
   *
!  * If the HEAP_INSERT_SKIP_WAL option is supplied, the new tuple is not logged
!  * in WAL, even for a non-temp relation.  Safe usage of this behavior requires
!  * that we arrange that all new tuples go into new pages not containing any
!  * tuples from other transactions, and that the relation gets fsync'd before
!  * commit.
   * (See also heap_sync() comments)
   *
!  * The HEAP_INSERT_SKIP_FSM option is passed directly to
!  * RelationGetBufferForTuple, which see for more info.
   *
!  * Note that options will be applied when inserting into the heap's TOAST
!  * table, too, if the tuple requires any out-of-line data.
   *
   * The return value is the OID assigned to the tuple (either here or by the
   * caller), or InvalidOid if no OID.  The header fields of *tup are updated
***************
*** 1825,1831 ****
   */
  Oid
  heap_insert(Relation relation, HeapTuple tup, CommandId cid,
! 			bool use_wal, bool use_fsm)
  {
  	TransactionId xid = GetCurrentTransactionId();
  	HeapTuple	heaptup;
--- 1850,1856 ----
   */
  Oid
  heap_insert(Relation relation, HeapTuple tup, CommandId cid,
! 			int options, BulkInsertState bistate)
  {
  	TransactionId xid = GetCurrentTransactionId();
  	HeapTuple	heaptup;
***************
*** 1877,1890 ****
  		heaptup = tup;
  	}
  	else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
! 		heaptup = toast_insert_or_update(relation, tup, NULL,
! 										 use_wal, use_fsm);
  	else
  		heaptup = tup;
  
  	/* Find buffer to insert this tuple into */
  	buffer = RelationGetBufferForTuple(relation, heaptup->t_len,
! 									   InvalidBuffer, use_fsm);
  
  	/* NO EREPORT(ERROR) from here till changes are logged */
  	START_CRIT_SECTION();
--- 1902,1914 ----
  		heaptup = tup;
  	}
  	else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
! 		heaptup = toast_insert_or_update(relation, tup, NULL, options);
  	else
  		heaptup = tup;
  
  	/* Find buffer to insert this tuple into */
  	buffer = RelationGetBufferForTuple(relation, heaptup->t_len,
! 									   InvalidBuffer, options, bistate);
  
  	/* NO EREPORT(ERROR) from here till changes are logged */
  	START_CRIT_SECTION();
***************
*** 1905,1911 ****
  	MarkBufferDirty(buffer);
  
  	/* XLOG stuff */
! 	if (use_wal && !relation->rd_istemp)
  	{
  		xl_heap_insert xlrec;
  		xl_heap_header xlhdr;
--- 1929,1935 ----
  	MarkBufferDirty(buffer);
  
  	/* XLOG stuff */
! 	if ((options & HEAP_INSERT_SKIP_WAL) == 0 && !relation->rd_istemp)
  	{
  		xl_heap_insert xlrec;
  		xl_heap_header xlhdr;
***************
*** 2000,2006 ****
  Oid
  simple_heap_insert(Relation relation, HeapTuple tup)
  {
! 	return heap_insert(relation, tup, GetCurrentCommandId(true), true, true);
  }
  
  /*
--- 2024,2030 ----
  Oid
  simple_heap_insert(Relation relation, HeapTuple tup)
  {
! 	return heap_insert(relation, tup, GetCurrentCommandId(true), 0, NULL);
  }
  
  /*
***************
*** 2595,2602 ****
  		if (need_toast)
  		{
  			/* Note we always use WAL and FSM during updates */
! 			heaptup = toast_insert_or_update(relation, newtup, &oldtup,
! 											 true, true);
  			newtupsize = MAXALIGN(heaptup->t_len);
  		}
  		else
--- 2619,2625 ----
  		if (need_toast)
  		{
  			/* Note we always use WAL and FSM during updates */
! 			heaptup = toast_insert_or_update(relation, newtup, &oldtup, 0);
  			newtupsize = MAXALIGN(heaptup->t_len);
  		}
  		else
***************
*** 2623,2629 ****
  		{
  			/* Assume there's no chance to put heaptup on same page. */
  			newbuf = RelationGetBufferForTuple(relation, heaptup->t_len,
! 											   buffer, true);
  		}
  		else
  		{
--- 2646,2652 ----
  		{
  			/* Assume there's no chance to put heaptup on same page. */
  			newbuf = RelationGetBufferForTuple(relation, heaptup->t_len,
! 											   buffer, 0, NULL);
  		}
  		else
  		{
***************
*** 2640,2646 ****
  				 */
  				LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
  				newbuf = RelationGetBufferForTuple(relation, heaptup->t_len,
! 												   buffer, true);
  			}
  			else
  			{
--- 2663,2669 ----
  				 */
  				LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
  				newbuf = RelationGetBufferForTuple(relation, heaptup->t_len,
! 												   buffer, 0, NULL);
  			}
  			else
  			{
Index: src/backend/access/heap/hio.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/access/heap/hio.c,v
retrieving revision 1.73
diff -c -r1.73 hio.c
*** src/backend/access/heap/hio.c	30 Sep 2008 10:52:10 -0000	1.73
--- src/backend/access/heap/hio.c	6 Nov 2008 03:25:20 -0000
***************
*** 15,20 ****
--- 15,21 ----
  
  #include "postgres.h"
  
+ #include "access/heapam.h"
  #include "access/hio.h"
  #include "storage/bufmgr.h"
  #include "storage/freespace.h"
***************
*** 56,61 ****
--- 57,90 ----
  	((HeapTupleHeader) item)->t_ctid = tuple->t_self;
  }
  
+ 
+ /*
+  * Read in a buffer and update bulk insert state object if necessary.
+  */
+ static Buffer
+ RelationReadBuffer(Relation relation, BlockNumber targetBlock,
+ 	BulkInsertState bistate)
+ {
+ 	Buffer buffer;
+ 
+ 	if (!bistate)
+ 		return ReadBuffer(relation, targetBlock);
+ 	if (bistate->last_pin != InvalidBuffer)
+ 	{
+ 		if (BufferGetBlockNumber(bistate->last_pin) == targetBlock)
+ 		{
+ 			IncrBufferRefCount(bistate->last_pin);
+ 			return bistate->last_pin;
+ 		}
+ 		ReleaseBuffer(bistate->last_pin);
+ 	}
+ 	buffer = ReadBufferExtended(relation, MAIN_FORKNUM, targetBlock,
+ 		RBM_NORMAL, bistate->strategy);
+ 	bistate->last_pin = buffer;
+ 	IncrBufferRefCount(buffer);
+ 	return buffer;
+ }
+ 
  /*
   * RelationGetBufferForTuple
   *
***************
*** 80,92 ****
   *	happen if space is freed in that page after heap_update finds there's not
   *	enough there).	In that case, the page will be pinned and locked only once.
   *
!  *	If use_fsm is true (the normal case), we use FSM to help us find free
!  *	space.	If use_fsm is false, we always append a new empty page to the
!  *	end of the relation if the tuple won't fit on the current target page.
   *	This can save some cycles when we know the relation is new and doesn't
   *	contain useful amounts of free space.
   *
!  *	The use_fsm = false case is also useful for non-WAL-logged additions to a
   *	relation, if the caller holds exclusive lock and is careful to invalidate
   *	relation->rd_targblock before the first insertion --- that ensures that
   *	all insertions will occur into newly added pages and not be intermixed
--- 109,121 ----
   *	happen if space is freed in that page after heap_update finds there's not
   *	enough there).	In that case, the page will be pinned and locked only once.
   *
!  *	We normally use FSM to help us find free space.	 However,
!  *	if HEAP_INSERT_SKIP_FSM is specified, we just append a new empty page to
!  *	the end of the relation if the tuple won't fit on the current target page.
   *	This can save some cycles when we know the relation is new and doesn't
   *	contain useful amounts of free space.
   *
!  *	HEAP_INSERT_SKIP_FSM is also useful for non-WAL-logged additions to a
   *	relation, if the caller holds exclusive lock and is careful to invalidate
   *	relation->rd_targblock before the first insertion --- that ensures that
   *	all insertions will occur into newly added pages and not be intermixed
***************
*** 104,110 ****
   */
  Buffer
  RelationGetBufferForTuple(Relation relation, Size len,
! 						  Buffer otherBuffer, bool use_fsm)
  {
  	Buffer		buffer = InvalidBuffer;
  	Page		page;
--- 133,140 ----
   */
  Buffer
  RelationGetBufferForTuple(Relation relation, Size len,
! 						  Buffer otherBuffer, int options,
! 						  BulkInsertState bistate)
  {
  	Buffer		buffer = InvalidBuffer;
  	Page		page;
***************
*** 113,121 ****
--- 143,155 ----
  	BlockNumber targetBlock,
  				otherBlock;
  	bool		needLock;
+ 	bool		use_fsm = (options & HEAP_INSERT_SKIP_FSM) == 0;
  
  	len = MAXALIGN(len);		/* be conservative */
  
+ 	/* Bulk insert is not supported for updates, only inserts. */
+ 	Assert(!bistate || otherBuffer == InvalidBuffer);
+ 
  	/*
  	 * If we're gonna fail for oversize tuple, do it right away
  	 */
***************
*** 137,155 ****
  
  	/*
  	 * We first try to put the tuple on the same page we last inserted a tuple
! 	 * on, as cached in the relcache entry.  If that doesn't work, we ask the
! 	 * shared Free Space Map to locate a suitable page.  Since the FSM's info
! 	 * might be out of date, we have to be prepared to loop around and retry
! 	 * multiple times.	(To insure this isn't an infinite loop, we must update
! 	 * the FSM with the correct amount of free space on each page that proves
! 	 * not to be suitable.)  If the FSM has no record of a page with enough
! 	 * free space, we give up and extend the relation.
  	 *
  	 * When use_fsm is false, we either put the tuple onto the existing target
  	 * page or extend the relation.
  	 */
  	if (len + saveFreeSpace <= MaxHeapTupleSize)
! 		targetBlock = relation->rd_targblock;
  	else
  	{
  		/* can't fit, don't screw up FSM request tracking by trying */
--- 171,191 ----
  
  	/*
  	 * We first try to put the tuple on the same page we last inserted a tuple
! 	 * on, as cached in the BulkInsertState or relcache entry.  If that
! 	 * doesn't work, we ask the shared Free Space Map to locate a suitable
! 	 * page.  Since the FSM's info might be out of date, we have to be prepared
! 	 * to loop around and retry multiple times.	(To insure this isn't an
! 	 * infinite loop, we must update the FSM with the correct amount of free
! 	 * space on each page that proves not to be suitable.)  If the FSM has no
! 	 * record of a page with enough free space, we give up and extend the
! 	 * relation.
  	 *
  	 * When use_fsm is false, we either put the tuple onto the existing target
  	 * page or extend the relation.
  	 */
  	if (len + saveFreeSpace <= MaxHeapTupleSize)
! 		targetBlock = bistate && bistate->last_pin != InvalidBuffer ?
! 			BufferGetBlockNumber(bistate->last_pin) : relation->rd_targblock;
  	else
  	{
  		/* can't fit, don't screw up FSM request tracking by trying */
***************
*** 189,195 ****
  		if (otherBuffer == InvalidBuffer)
  		{
  			/* easy case */
! 			buffer = ReadBuffer(relation, targetBlock);
  			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
  		}
  		else if (otherBlock == targetBlock)
--- 225,231 ----
  		if (otherBuffer == InvalidBuffer)
  		{
  			/* easy case */
! 			buffer = RelationReadBuffer(relation, targetBlock, bistate);
  			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
  		}
  		else if (otherBlock == targetBlock)
***************
*** 274,280 ****
  	 * it worth keeping an accurate file length in shared memory someplace,
  	 * rather than relying on the kernel to do it for us?
  	 */
! 	buffer = ReadBuffer(relation, P_NEW);
  
  	/*
  	 * We can be certain that locking the otherBuffer first is OK, since it
--- 310,316 ----
  	 * it worth keeping an accurate file length in shared memory someplace,
  	 * rather than relying on the kernel to do it for us?
  	 */
! 	buffer = RelationReadBuffer(relation, P_NEW, bistate);
  
  	/*
  	 * We can be certain that locking the otherBuffer first is OK, since it
Index: src/backend/access/heap/rewriteheap.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/access/heap/rewriteheap.c,v
retrieving revision 1.15
diff -c -r1.15 rewriteheap.c
*** src/backend/access/heap/rewriteheap.c	11 Aug 2008 11:05:10 -0000	1.15
--- src/backend/access/heap/rewriteheap.c	6 Nov 2008 03:25:20 -0000
***************
*** 575,581 ****
  	}
  	else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
  		heaptup = toast_insert_or_update(state->rs_new_rel, tup, NULL,
! 										 state->rs_use_wal, false);
  	else
  		heaptup = tup;
  
--- 575,583 ----
  	}
  	else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
  		heaptup = toast_insert_or_update(state->rs_new_rel, tup, NULL,
! 										 HEAP_INSERT_SKIP_FSM |
! 										 (state->rs_use_wal ?
! 									     0 : HEAP_INSERT_SKIP_WAL));
  	else
  		heaptup = tup;
  
Index: src/backend/access/heap/tuptoaster.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/access/heap/tuptoaster.c,v
retrieving revision 1.90
diff -c -r1.90 tuptoaster.c
*** src/backend/access/heap/tuptoaster.c	2 Nov 2008 01:45:27 -0000	1.90
--- src/backend/access/heap/tuptoaster.c	6 Nov 2008 03:25:21 -0000
***************
*** 74,81 ****
  
  
  static void toast_delete_datum(Relation rel, Datum value);
! static Datum toast_save_datum(Relation rel, Datum value,
! 				 bool use_wal, bool use_fsm);
  static struct varlena *toast_fetch_datum(struct varlena * attr);
  static struct varlena *toast_fetch_datum_slice(struct varlena * attr,
  						int32 sliceoffset, int32 length);
--- 74,80 ----
  
  
  static void toast_delete_datum(Relation rel, Datum value);
! static Datum toast_save_datum(Relation rel, Datum value, int options);
  static struct varlena *toast_fetch_datum(struct varlena * attr);
  static struct varlena *toast_fetch_datum_slice(struct varlena * attr,
  						int32 sliceoffset, int32 length);
***************
*** 400,406 ****
   * Inputs:
   *	newtup: the candidate new tuple to be inserted
   *	oldtup: the old row version for UPDATE, or NULL for INSERT
!  *	use_wal, use_fsm: flags to be passed to heap_insert() for toast rows
   * Result:
   *	either newtup if no toasting is needed, or a palloc'd modified tuple
   *	that is what should actually get stored
--- 399,405 ----
   * Inputs:
   *	newtup: the candidate new tuple to be inserted
   *	oldtup: the old row version for UPDATE, or NULL for INSERT
!  *	options: options to be passed to heap_insert() for toast rows
   * Result:
   *	either newtup if no toasting is needed, or a palloc'd modified tuple
   *	that is what should actually get stored
***************
*** 411,417 ****
   */
  HeapTuple
  toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup,
! 					   bool use_wal, bool use_fsm)
  {
  	HeapTuple	result_tuple;
  	TupleDesc	tupleDesc;
--- 410,416 ----
   */
  HeapTuple
  toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup,
! 					   int options)
  {
  	HeapTuple	result_tuple;
  	TupleDesc	tupleDesc;
***************
*** 677,684 ****
  		{
  			old_value = toast_values[i];
  			toast_action[i] = 'p';
! 			toast_values[i] = toast_save_datum(rel, toast_values[i],
! 											   use_wal, use_fsm);
  			if (toast_free[i])
  				pfree(DatumGetPointer(old_value));
  			toast_free[i] = true;
--- 676,682 ----
  		{
  			old_value = toast_values[i];
  			toast_action[i] = 'p';
! 			toast_values[i] = toast_save_datum(rel, toast_values[i], options);
  			if (toast_free[i])
  				pfree(DatumGetPointer(old_value));
  			toast_free[i] = true;
***************
*** 728,735 ****
  		i = biggest_attno;
  		old_value = toast_values[i];
  		toast_action[i] = 'p';
! 		toast_values[i] = toast_save_datum(rel, toast_values[i],
! 										   use_wal, use_fsm);
  		if (toast_free[i])
  			pfree(DatumGetPointer(old_value));
  		toast_free[i] = true;
--- 726,732 ----
  		i = biggest_attno;
  		old_value = toast_values[i];
  		toast_action[i] = 'p';
! 		toast_values[i] = toast_save_datum(rel, toast_values[i], options);
  		if (toast_free[i])
  			pfree(DatumGetPointer(old_value));
  		toast_free[i] = true;
***************
*** 838,845 ****
  		i = biggest_attno;
  		old_value = toast_values[i];
  		toast_action[i] = 'p';
! 		toast_values[i] = toast_save_datum(rel, toast_values[i],
! 										   use_wal, use_fsm);
  		if (toast_free[i])
  			pfree(DatumGetPointer(old_value));
  		toast_free[i] = true;
--- 835,841 ----
  		i = biggest_attno;
  		old_value = toast_values[i];
  		toast_action[i] = 'p';
! 		toast_values[i] = toast_save_datum(rel, toast_values[i], options);
  		if (toast_free[i])
  			pfree(DatumGetPointer(old_value));
  		toast_free[i] = true;
***************
*** 1120,1127 ****
   * ----------
   */
  static Datum
! toast_save_datum(Relation rel, Datum value,
! 				 bool use_wal, bool use_fsm)
  {
  	Relation	toastrel;
  	Relation	toastidx;
--- 1116,1122 ----
   * ----------
   */
  static Datum
! toast_save_datum(Relation rel, Datum value, int options)
  {
  	Relation	toastrel;
  	Relation	toastidx;
***************
*** 1218,1224 ****
  		memcpy(VARDATA(&chunk_data), data_p, chunk_size);
  		toasttup = heap_form_tuple(toasttupDesc, t_values, t_isnull);
  
! 		heap_insert(toastrel, toasttup, mycid, use_wal, use_fsm);
  
  		/*
  		 * Create the index entry.	We cheat a little here by not using
--- 1213,1219 ----
  		memcpy(VARDATA(&chunk_data), data_p, chunk_size);
  		toasttup = heap_form_tuple(toasttupDesc, t_values, t_isnull);
  
! 		heap_insert(toastrel, toasttup, mycid, options, NULL);
  
  		/*
  		 * Create the index entry.	We cheat a little here by not using
Index: src/backend/commands/copy.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/commands/copy.c,v
retrieving revision 1.300
diff -c -r1.300 copy.c
*** src/backend/commands/copy.c	2 Nov 2008 01:45:27 -0000	1.300
--- src/backend/commands/copy.c	6 Nov 2008 03:25:23 -0000
***************
*** 1653,1660 ****
  	MemoryContext oldcontext = CurrentMemoryContext;
  	ErrorContextCallback errcontext;
  	CommandId	mycid = GetCurrentCommandId(true);
! 	bool		use_wal = true; /* by default, use WAL logging */
! 	bool		use_fsm = true; /* by default, use FSM for free space */
  
  	Assert(cstate->rel);
  
--- 1653,1660 ----
  	MemoryContext oldcontext = CurrentMemoryContext;
  	ErrorContextCallback errcontext;
  	CommandId	mycid = GetCurrentCommandId(true);
! 	int options = 0;
! 	BulkInsertState bistate;
  
  	Assert(cstate->rel);
  
***************
*** 1707,1715 ****
  	if (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
  		cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId)
  	{
! 		use_fsm = false;
  		if (!XLogArchivingActive())
! 			use_wal = false;
  	}
  
  	if (pipe)
--- 1707,1715 ----
  	if (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
  		cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId)
  	{
! 		options |= HEAP_INSERT_SKIP_FSM;
  		if (!XLogArchivingActive())
! 			options |= HEAP_INSERT_SKIP_WAL;
  	}
  
  	if (pipe)
***************
*** 1886,1891 ****
--- 1886,1893 ----
  	cstate->cur_attname = NULL;
  	cstate->cur_attval = NULL;
  
+ 	bistate = GetBulkInsertState();
+ 
  	/* Set up callback to identify error line number */
  	errcontext.callback = copy_in_error_callback;
  	errcontext.arg = (void *) cstate;
***************
*** 2108,2114 ****
  				ExecConstraints(resultRelInfo, slot, estate);
  
  			/* OK, store the tuple and create index entries for it */
! 			heap_insert(cstate->rel, tuple, mycid, use_wal, use_fsm);
  
  			if (resultRelInfo->ri_NumIndices > 0)
  				ExecInsertIndexTuples(slot, &(tuple->t_self), estate, false);
--- 2110,2116 ----
  				ExecConstraints(resultRelInfo, slot, estate);
  
  			/* OK, store the tuple and create index entries for it */
! 			heap_insert(cstate->rel, tuple, mycid, options, bistate);
  
  			if (resultRelInfo->ri_NumIndices > 0)
  				ExecInsertIndexTuples(slot, &(tuple->t_self), estate, false);
***************
*** 2126,2131 ****
--- 2128,2134 ----
  	}
  
  	/* Done, clean up */
+ 	FreeBulkInsertState(bistate);
  	error_context_stack = errcontext.previous;
  
  	MemoryContextSwitchTo(oldcontext);
***************
*** 2164,2170 ****
  	 * If we skipped writing WAL, then we need to sync the heap (but not
  	 * indexes since those use WAL anyway)
  	 */
! 	if (!use_wal)
  		heap_sync(cstate->rel);
  }
  
--- 2167,2173 ----
  	 * If we skipped writing WAL, then we need to sync the heap (but not
  	 * indexes since those use WAL anyway)
  	 */
! 	if ((options & HEAP_INSERT_SKIP_WAL) != 0)
  		heap_sync(cstate->rel);
  }
  
Index: src/backend/executor/execMain.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/executor/execMain.c,v
retrieving revision 1.314
diff -c -r1.314 execMain.c
*** src/backend/executor/execMain.c	31 Oct 2008 21:07:54 -0000	1.314
--- src/backend/executor/execMain.c	6 Nov 2008 03:25:25 -0000
***************
*** 1623,1630 ****
  	 * t_self field.
  	 */
  	newId = heap_insert(resultRelationDesc, tuple,
! 						estate->es_output_cid,
! 						true, true);
  
  	IncrAppended();
  	(estate->es_processed)++;
--- 1623,1629 ----
  	 * t_self field.
  	 */
  	newId = heap_insert(resultRelationDesc, tuple,
! 						estate->es_output_cid, 0, NULL);
  
  	IncrAppended();
  	(estate->es_processed)++;
***************
*** 2621,2627 ****
  	DestReceiver pub;			/* publicly-known function pointers */
  	EState	   *estate;			/* EState we are working with */
  	Relation	rel;			/* Relation to write to */
! 	bool		use_wal;		/* do we need to WAL-log our writes? */
  } DR_intorel;
  
  /*
--- 2620,2627 ----
  	DestReceiver pub;			/* publicly-known function pointers */
  	EState	   *estate;			/* EState we are working with */
  	Relation	rel;			/* Relation to write to */
! 	int			hi_options;		/* heap_insert performance options */
! 	BulkInsertState bistate;	/* bulk insert state */
  } DR_intorel;
  
  /*
***************
*** 2753,2763 ****
  	myState = (DR_intorel *) queryDesc->dest;
  	Assert(myState->pub.mydest == DestIntoRel);
  	myState->estate = estate;
  
  	/*
! 	 * We can skip WAL-logging the insertions, unless PITR is in use.
  	 */
! 	myState->use_wal = XLogArchivingActive();
  	myState->rel = intoRelationDesc;
  
  	/* use_wal off requires rd_targblock be initially invalid */
--- 2753,2766 ----
  	myState = (DR_intorel *) queryDesc->dest;
  	Assert(myState->pub.mydest == DestIntoRel);
  	myState->estate = estate;
+ 	myState->bistate = GetBulkInsertState();
  
  	/*
! 	 * We can skip WAL-logging the insertions, unless PITR is in use.  We
! 	 * can skip the FSM in any case.
  	 */
! 	myState->hi_options = HEAP_INSERT_SKIP_FSM |
! 		(XLogArchivingActive() ? 0 : HEAP_INSERT_SKIP_WAL);
  	myState->rel = intoRelationDesc;
  
  	/* use_wal off requires rd_targblock be initially invalid */
***************
*** 2775,2782 ****
  	/* OpenIntoRel might never have gotten called */
  	if (myState && myState->pub.mydest == DestIntoRel && myState->rel)
  	{
  		/* If we skipped using WAL, must heap_sync before commit */
! 		if (!myState->use_wal)
  			heap_sync(myState->rel);
  
  		/* close rel, but keep lock until commit */
--- 2778,2787 ----
  	/* OpenIntoRel might never have gotten called */
  	if (myState && myState->pub.mydest == DestIntoRel && myState->rel)
  	{
+ 		FreeBulkInsertState(myState->bistate);
+ 
  		/* If we skipped using WAL, must heap_sync before commit */
! 		if ((myState->hi_options & HEAP_INSERT_SKIP_WAL) != 0)
  			heap_sync(myState->rel);
  
  		/* close rel, but keep lock until commit */
***************
*** 2834,2841 ****
  	heap_insert(myState->rel,
  				tuple,
  				myState->estate->es_output_cid,
! 				myState->use_wal,
! 				false);			/* never any point in using FSM */
  
  	/* We know this is a newly created relation, so there are no indexes */
  
--- 2839,2846 ----
  	heap_insert(myState->rel,
  				tuple,
  				myState->estate->es_output_cid,
! 				myState->hi_options,
! 				myState->bistate);
  
  	/* We know this is a newly created relation, so there are no indexes */
  
Index: src/backend/storage/buffer/README
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/storage/buffer/README,v
retrieving revision 1.14
diff -c -r1.14 README
*** src/backend/storage/buffer/README	21 Mar 2008 13:23:28 -0000	1.14
--- src/backend/storage/buffer/README	6 Nov 2008 03:25:26 -0000
***************
*** 235,240 ****
--- 235,242 ----
  buffer, resulting in excessive WAL flushing.  Allowing VACUUM to update
  256KB between WAL flushes should be more efficient.
  
+ Beginning in 8.4, COPY IN and CREATE TABLE AS SELECT also uses a ring buffer,
+ in order to avoid trashing the entire buffer arena.
  
  Background Writer's Processing
  ------------------------------
Index: src/backend/storage/buffer/freelist.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/storage/buffer/freelist.c,v
retrieving revision 1.64
diff -c -r1.64 freelist.c
*** src/backend/storage/buffer/freelist.c	1 Jan 2008 19:45:51 -0000	1.64
--- src/backend/storage/buffer/freelist.c	6 Nov 2008 03:25:26 -0000
***************
*** 387,392 ****
--- 387,395 ----
  		case BAS_VACUUM:
  			ring_size = 256 * 1024 / BLCKSZ;
  			break;
+ 		case BAS_BULKWRITE:
+ 			ring_size = 256 * 1024 / BLCKSZ;
+ 			break;
  
  		default:
  			elog(ERROR, "unrecognized buffer access strategy: %d",
Index: src/include/access/heapam.h
===================================================================
RCS file: /projects/cvsroot/pgsql/src/include/access/heapam.h,v
retrieving revision 1.139
diff -c -r1.139 heapam.h
*** src/include/access/heapam.h	8 Oct 2008 01:14:44 -0000	1.139
--- src/include/access/heapam.h	6 Nov 2008 03:25:27 -0000
***************
*** 31,36 ****
--- 31,37 ----
  	LockTupleExclusive
  } LockTupleMode;
  
+ typedef struct BulkInsertStateData *BulkInsertState;
  
  /* ----------------
   *		function prototypes for heap access method
***************
*** 86,93 ****
  					ItemPointer tid);
  extern void setLastTid(const ItemPointer tid);
  
  extern Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid,
! 			bool use_wal, bool use_fsm);
  extern HTSU_Result heap_delete(Relation relation, ItemPointer tid,
  			ItemPointer ctid, TransactionId *update_xmax,
  			CommandId cid, Snapshot crosscheck, bool wait);
--- 87,99 ----
  					ItemPointer tid);
  extern void setLastTid(const ItemPointer tid);
  
+ #define HEAP_INSERT_SKIP_WAL   0x0001
+ #define HEAP_INSERT_SKIP_FSM   0x0002
+ 
  extern Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid,
! 			int options, BulkInsertState bistate);
! BulkInsertState GetBulkInsertState(void);
! void FreeBulkInsertState(BulkInsertState);
  extern HTSU_Result heap_delete(Relation relation, ItemPointer tid,
  			ItemPointer ctid, TransactionId *update_xmax,
  			CommandId cid, Snapshot crosscheck, bool wait);
Index: src/include/access/hio.h
===================================================================
RCS file: /projects/cvsroot/pgsql/src/include/access/hio.h,v
retrieving revision 1.36
diff -c -r1.36 hio.h
*** src/include/access/hio.h	19 Jun 2008 00:46:06 -0000	1.36
--- src/include/access/hio.h	6 Nov 2008 03:25:27 -0000
***************
*** 18,26 ****
  #include "utils/relcache.h"
  #include "storage/buf.h"
  
  extern void RelationPutHeapTuple(Relation relation, Buffer buffer,
  					 HeapTuple tuple);
  extern Buffer RelationGetBufferForTuple(Relation relation, Size len,
! 						  Buffer otherBuffer, bool use_fsm);
  
  #endif   /* HIO_H */
--- 18,33 ----
  #include "utils/relcache.h"
  #include "storage/buf.h"
  
+ /* private to access/heap/heapam.c and access/heap/hio.c */
+ struct BulkInsertStateData {
+ 	BufferAccessStrategy strategy;
+ 	Buffer last_pin;
+ };
+ 
  extern void RelationPutHeapTuple(Relation relation, Buffer buffer,
  					 HeapTuple tuple);
  extern Buffer RelationGetBufferForTuple(Relation relation, Size len,
! 						  Buffer otherBuffer, int options,
! 						  BulkInsertState bistate);
  
  #endif   /* HIO_H */
Index: src/include/access/tuptoaster.h
===================================================================
RCS file: /projects/cvsroot/pgsql/src/include/access/tuptoaster.h,v
retrieving revision 1.41
diff -c -r1.41 tuptoaster.h
*** src/include/access/tuptoaster.h	13 Jul 2008 20:45:47 -0000	1.41
--- src/include/access/tuptoaster.h	6 Nov 2008 03:25:27 -0000
***************
*** 93,99 ****
   */
  extern HeapTuple toast_insert_or_update(Relation rel,
  					   HeapTuple newtup, HeapTuple oldtup,
! 					   bool use_wal, bool use_fsm);
  
  /* ----------
   * toast_delete -
--- 93,99 ----
   */
  extern HeapTuple toast_insert_or_update(Relation rel,
  					   HeapTuple newtup, HeapTuple oldtup,
! 					   int options);
  
  /* ----------
   * toast_delete -
Index: src/include/storage/bufmgr.h
===================================================================
RCS file: /projects/cvsroot/pgsql/src/include/storage/bufmgr.h,v
retrieving revision 1.116
diff -c -r1.116 bufmgr.h
*** src/include/storage/bufmgr.h	31 Oct 2008 15:05:00 -0000	1.116
--- src/include/storage/bufmgr.h	6 Nov 2008 03:25:28 -0000
***************
*** 28,34 ****
  	BAS_NORMAL,					/* Normal random access */
  	BAS_BULKREAD,				/* Large read-only scan (hint bit updates are
  								 * ok) */
! 	BAS_VACUUM					/* VACUUM */
  } BufferAccessStrategyType;
  
  /* Possible modes for ReadBufferExtended() */
--- 28,35 ----
  	BAS_NORMAL,					/* Normal random access */
  	BAS_BULKREAD,				/* Large read-only scan (hint bit updates are
  								 * ok) */
! 	BAS_VACUUM,					/* VACUUM */
! 	BAS_BULKWRITE				/* Large multi-block write (e.g. COPY IN) */
  } BufferAccessStrategyType;
  
  /* Possible modes for ReadBufferExtended() */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to