Jaime Casanova wrote:
On 5/18/07, Heikki Linnakangas <[EMAIL PROTECTED]> wrote:
Jaime Casanova wrote:
>
> the patch doesn't apply in cvs... you'll need to update it...

Oh, here you are.

The implementation has changed a bit since August. I thought I had
submitted an updated version in the winter but couldn't find it. Anyway,
I updated and dusted off the source tree, tidied up the comments a
little bit, and fixed some inconsistencies in pg_proc entries that made
opr_sanity to fail.


this one doesn't apply either... there are problems with nbtinsert.c and pg_am.h

Ah, sorry about that. For some reason my source tree was checked out from the 8.2 branch, instead of CVS HEAD.

Here you are. Thanks for looking at this!

--
  Heikki Linnakangas
  EnterpriseDB   http://www.enterprisedb.com
Index: doc/src/sgml/catalogs.sgml
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/doc/src/sgml/catalogs.sgml,v
retrieving revision 2.152
diff -c -r2.152 catalogs.sgml
*** doc/src/sgml/catalogs.sgml	15 May 2007 19:13:54 -0000	2.152
--- doc/src/sgml/catalogs.sgml	19 May 2007 16:23:49 -0000
***************
*** 517,522 ****
--- 517,536 ----
        <entry>Function to parse and validate <structfield>reloptions</> for an index</entry>
       </row>
  
+      <row>
+       <entry><structfield>amprepareinsert</structfield></entry>
+       <entry><type>regproc</type></entry>
+       <entry><literal><link linkend="catalog-pg-proc"><structname>pg_proc</structname></link>.oid</literal></entry>
+       <entry>Performs the 1st phase of a two phase index insert, returning a suggestion of where in the heap to put a new tuple</entry>
+      </row>
+ 
+      <row>
+       <entry><structfield>amfinishinsert</structfield></entry>
+       <entry><type>regproc</type></entry>
+       <entry><literal><link linkend="catalog-pg-proc"><structname>pg_proc</structname></link>.oid</literal></entry>
+       <entry>Finishes an index insert started with amprepareinsert</entry>
+      </row>
+ 
      </tbody>
     </tgroup>
    </table>
Index: src/backend/access/heap/heapam.c
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/access/heap/heapam.c,v
retrieving revision 1.232
diff -c -r1.232 heapam.c
*** src/backend/access/heap/heapam.c	8 Apr 2007 01:26:27 -0000	1.232
--- src/backend/access/heap/heapam.c	19 May 2007 16:45:14 -0000
***************
*** 1368,1373 ****
--- 1368,1377 ----
   * Note that use_wal and use_fsm will be applied when inserting into the
   * heap's TOAST table, too, if the tuple requires any out-of-line data.
   *
+  * If suggested_blk is a valid block number, the tuple will be inserted to
+  * that block if there's enough room. If it's full, a block will be chosen
+  * as if suggested_blk was not set.
+  *
   * The return value is the OID assigned to the tuple (either here or by the
   * caller), or InvalidOid if no OID.  The header fields of *tup are updated
   * to match the stored tuple; in particular tup->t_self receives the actual
***************
*** 1376,1382 ****
   */
  Oid
  heap_insert(Relation relation, HeapTuple tup, CommandId cid,
! 			bool use_wal, bool use_fsm)
  {
  	TransactionId xid = GetCurrentTransactionId();
  	HeapTuple	heaptup;
--- 1380,1386 ----
   */
  Oid
  heap_insert(Relation relation, HeapTuple tup, CommandId cid,
! 			bool use_wal, bool use_fsm, BlockNumber suggested_blk)
  {
  	TransactionId xid = GetCurrentTransactionId();
  	HeapTuple	heaptup;
***************
*** 1432,1440 ****
  	else
  		heaptup = tup;
  
! 	/* Find buffer to insert this tuple into */
! 	buffer = RelationGetBufferForTuple(relation, heaptup->t_len,
! 									   InvalidBuffer, use_fsm);
  
  	/* NO EREPORT(ERROR) from here till changes are logged */
  	START_CRIT_SECTION();
--- 1436,1478 ----
  	else
  		heaptup = tup;
  
! 	/* Find buffer to insert this tuple into. Try the suggested block first
! 	 * if caller gave one.
! 	 */
! 	if (suggested_blk != InvalidBlockNumber)
! 	{
! 		Buffer suggested_buf;
! 		Page pageHeader;
! 		Size pageFreeSpace;
! 
! 		suggested_buf = ReadBuffer(relation, suggested_blk);
! 		pageHeader = (Page) BufferGetPage(suggested_buf);
! 
! 		LockBuffer(suggested_buf, BUFFER_LOCK_EXCLUSIVE);
! 
! 		/* Don't subtract fillfactor from the free space. That space is
! 		 * reserved exactly for situations like this; keeping updated and
! 		 * inserted tuples close to other tuples with similar values.
! 		 */
! 		pageFreeSpace = PageGetFreeSpace(pageHeader);
! 
! 		if (heaptup->t_len <= pageFreeSpace)
! 			buffer = suggested_buf;
! 		else
! 		{
! 			/* Page was full. Release lock and pin and get another block
! 			 * as if suggested_blk was not given. 
! 			 */
! 			LockBuffer(suggested_buf, BUFFER_LOCK_UNLOCK);
! 			ReleaseBuffer(suggested_buf);
! 
! 			buffer = RelationGetBufferForTuple(relation, heaptup->t_len,
! 											   InvalidBuffer, use_fsm);
! 		}
! 	} 
! 	else
! 		buffer = RelationGetBufferForTuple(relation, heaptup->t_len,
! 										   InvalidBuffer, use_fsm);
  
  	/* NO EREPORT(ERROR) from here till changes are logged */
  	START_CRIT_SECTION();
***************
*** 1544,1550 ****
  Oid
  simple_heap_insert(Relation relation, HeapTuple tup)
  {
! 	return heap_insert(relation, tup, GetCurrentCommandId(), true, true);
  }
  
  /*
--- 1582,1589 ----
  Oid
  simple_heap_insert(Relation relation, HeapTuple tup)
  {
! 	return heap_insert(relation, tup, GetCurrentCommandId(), true, 
! 					   true, InvalidBlockNumber);
  }
  
  /*
Index: src/backend/access/heap/tuptoaster.c
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/access/heap/tuptoaster.c,v
retrieving revision 1.74
diff -c -r1.74 tuptoaster.c
*** src/backend/access/heap/tuptoaster.c	6 Apr 2007 04:21:41 -0000	1.74
--- src/backend/access/heap/tuptoaster.c	19 May 2007 16:45:39 -0000
***************
*** 1146,1152 ****
  		if (!HeapTupleIsValid(toasttup))
  			elog(ERROR, "failed to build TOAST tuple");
  
! 		heap_insert(toastrel, toasttup, mycid, use_wal, use_fsm);
  
  		/*
  		 * Create the index entry.	We cheat a little here by not using
--- 1146,1153 ----
  		if (!HeapTupleIsValid(toasttup))
  			elog(ERROR, "failed to build TOAST tuple");
  
! 		heap_insert(toastrel, toasttup, mycid, use_wal, use_fsm, 
! 					InvalidBlockNumber);
  
  		/*
  		 * Create the index entry.	We cheat a little here by not using
Index: src/backend/access/index/indexam.c
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/access/index/indexam.c,v
retrieving revision 1.97
diff -c -r1.97 indexam.c
*** src/backend/access/index/indexam.c	5 Jan 2007 22:19:23 -0000	1.97
--- src/backend/access/index/indexam.c	19 May 2007 16:23:57 -0000
***************
*** 18,23 ****
--- 18,25 ----
   *		index_rescan	- restart a scan of an index
   *		index_endscan	- end a scan
   *		index_insert	- insert an index tuple into a relation
+  *		index_prepareinsert	- get desired insert location for a heap tuple
+  *		index_finishinsert	- insert a previously prepared index tuple
   *		index_markpos	- mark a scan position
   *		index_restrpos	- restore a scan position
   *		index_getnext	- get the next tuple from a scan
***************
*** 202,207 ****
--- 204,269 ----
  									  BoolGetDatum(check_uniqueness)));
  }
  
+ /* ----------------
+  *		index_prepareinsert - get desired insert location for a heap tuple
+  *
+  * The returned BlockNumber is the *heap* page that is the best place
+  * to insert the given tuple to, according to the index am. The best
+  * place is one that maintains the cluster order.
+  *
+  * opaque should be passed to a later index_finishinsert to finish the
+  * insert.
+  * ----------------
+  */
+ BlockNumber
+ index_prepareinsert(Relation indexRelation,
+ 					Datum *values,
+ 					bool *isnull,
+ 					Relation heapRelation,
+ 					bool check_uniqueness,
+ 					void **opaque)
+ {
+ 	FmgrInfo   *procedure;
+ 
+ 	RELATION_CHECKS;
+ 	GET_REL_PROCEDURE(amprepareinsert);
+ 
+ 	/*
+ 	 * have the am's prepareinsert proc do all the work.
+ 	 */
+ 	return DatumGetUInt32(FunctionCall6(procedure,
+ 										PointerGetDatum(indexRelation),
+ 										PointerGetDatum(values),
+ 										PointerGetDatum(isnull),
+ 										PointerGetDatum(heapRelation),
+ 										BoolGetDatum(check_uniqueness),
+ 										PointerGetDatum(opaque)));
+ }
+ 
+ /* ----------------
+  *		index_finishinsert - insert a previously prepared index tuple
+  *
+  * Finishes an insert operation initiated by an earlier call to
+  * index_prepareinsert. 
+  * ----------------
+  */
+ bool
+ index_finishinsert(Relation indexRelation,
+ 				   ItemPointer heap_t_ctid, void *opaque)
+ {
+ 	FmgrInfo   *procedure;
+ 
+ 	RELATION_CHECKS;
+ 	GET_REL_PROCEDURE(amfinishinsert);
+ 
+ 	/*
+ 	 * have the am's finishinsert proc do all the work.
+ 	 */
+ 	return DatumGetBool(FunctionCall2(procedure,
+ 									  PointerGetDatum(heap_t_ctid),
+ 									  PointerGetDatum(opaque)));
+ }
+ 
  /*
   * index_beginscan - start a scan of an index with amgettuple
   *
Index: src/backend/access/nbtree/nbtinsert.c
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/access/nbtree/nbtinsert.c,v
retrieving revision 1.156
diff -c -r1.156 nbtinsert.c
*** src/backend/access/nbtree/nbtinsert.c	11 Apr 2007 20:47:37 -0000	1.156
--- src/backend/access/nbtree/nbtinsert.c	19 May 2007 18:12:25 -0000
***************
*** 96,114 ****
  	/* we need an insertion scan key to do our search, so build one */
  	itup_scankey = _bt_mkscankey(rel, itup);
  
- top:
  	/* find the first page containing this key */
  	stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
  
  	offset = InvalidOffsetNumber;
  
! 	/* trade in our read lock for a write lock */
  	LockBuffer(buf, BUFFER_LOCK_UNLOCK);
  	LockBuffer(buf, BT_WRITE);
  
  	/*
  	 * If the page was split between the time that we surrendered our read
! 	 * lock and acquired our write lock, then this page may no longer be the
  	 * right place for the key we want to insert.  In this case, we need to
  	 * move right in the tree.	See Lehman and Yao for an excruciatingly
  	 * precise description.
--- 96,224 ----
  	/* we need an insertion scan key to do our search, so build one */
  	itup_scankey = _bt_mkscankey(rel, itup);
  
  	/* find the first page containing this key */
  	stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
  
  	offset = InvalidOffsetNumber;
  
! 	/* release our read lock. _bt_finishinsert will relock the page in
! 	 * exclusive mode. 
! 	 */
  	LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+ 
+ 	_bt_finishinsert(rel, heapRel, index_is_unique, itup, 
+ 					 itup_scankey, stack, buf);
+ }
+ 
+ /*
+  *	_bt_prepareinsert() -- Find the insert location for a new tuple
+  *
+  * Descends the tree and finds the location for a new index tuple.
+  * As a hint to the executor, returns the heap block number the previous
+  * index tuple at that location points to. By inserting the heap tuple
+  * to that block, the heap will stay better clustered than by inserting
+  * to a random block.
+  *
+  * The leaf page is pinned and a reference to it, among other information
+  * needed to finish the insert, is stored in opaquePtr.
+  */
+ BlockNumber
+ _bt_prepareinsert(Relation rel, IndexTuple itup, bool index_is_unique, 
+ 				  Relation heapRel, BTInsertInfo *opaquePtr)
+ {
+ 	int			natts = rel->rd_rel->relnatts;
+ 	OffsetNumber offset;
+ 	Page		page;
+ 	BTPageOpaque opaque;
+ 
+ 	ScanKey		itup_scankey;
+ 	BTStack		stack;
+ 	Buffer		buf;
+ 	BlockNumber suggestion = InvalidBlockNumber;
+ 	BTInsertInfo insert_opaque;
+ 
+ 	/* we need an insertion scan key to do our search, so build one */
+ 	itup_scankey = _bt_mkscankey(rel, itup);
+ 
+ 	/* find the first page containing this key */
+ 	stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_READ);
+ 	if(!BufferIsValid(buf))
+ 	{
+ 		/* The index was completely empty. No suggestion then. */
+ 		*opaquePtr = NULL;
+ 		return InvalidBlockNumber;
+ 	}
+ 
+ 	page = BufferGetPage(buf);
+ 	opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+ 
+ 	/* Find the location in the page where the new index tuple would go to. */
+ 
+ 	offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
+ 	if (offset > PageGetMaxOffsetNumber(page))
+ 	{
+ 		/* _bt_binsrch returned pointer to end-of-page. It means that
+ 		 * there was no equal items on the page, and the new item should 
+ 		 * be inserted as the last tuple of the page. There could be equal
+ 		 * items on the next page, however.
+ 		 *
+ 		 * At the moment, we just ignore the potential equal items on the 
+ 		 * right, and pretend there isn't any. We could instead walk right
+ 		 * to the next page to check that, but let's keep it simple for now.
+ 		 */
+ 		offset = OffsetNumberPrev(offset);
+ 	}
+ 	if(offset < P_FIRSTDATAKEY(opaque))
+ 	{
+ 		/* We landed on an empty page. We could step left or right until
+ 		 * we find some items, but let's keep it simple for now. 
+ 		 */
+ 	} else {
+ 		/* We're now positioned at the index tuple that we're interested in. */
+ 		ItemId iid = PageGetItemId(page, offset);
+ 		IndexTuple curitup = (IndexTuple) PageGetItem(page, iid);
+ 
+ 		suggestion = ItemPointerGetBlockNumber(&curitup->t_tid);
+ 	}
+ 
+ 	/* Release the read lock. _bt_finishinsert will later reacquire it in 
+ 	 * exclusive mode. Keeping the buffer locked would be deadlock-prone
+ 	 * as well; who knows what the caller is going to do, and what pages to
+ 	 * lock, before calling finishinsert.
+ 	 */
+ 	LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+ 
+ 	/* Return a struct with all information needed to finish this insert. */
+ 	insert_opaque = *opaquePtr = palloc(sizeof(struct BTInsertInfoData));
+ 	insert_opaque->rel = rel;
+ 	insert_opaque->heapRel = heapRel;
+ 	insert_opaque->index_is_unique = index_is_unique;
+ 	insert_opaque->itup = itup;
+ 	insert_opaque->itup_scankey = itup_scankey;
+ 	insert_opaque->stack = stack;
+ 	insert_opaque->buf = buf;
+ 
+ 	return suggestion;
+ }
+ 
+ /*
+  *	_bt_finishinsert() -- Finish an insert prepared with prepareinsert
+  */
+ void
+ _bt_finishinsert(Relation rel, Relation heapRel, bool index_is_unique,
+ 				 IndexTuple itup, ScanKey itup_scankey,
+ 				 BTStack stack, Buffer buf)
+ {
+ 	int			natts = rel->rd_rel->relnatts;
+ 	OffsetNumber offset = InvalidOffsetNumber;
+ 
  	LockBuffer(buf, BT_WRITE);
  
+ top:
+ 
  	/*
  	 * If the page was split between the time that we surrendered our read
! 	 * lock in _bt_prepareinsert or _bt_doinsert, and acquired our write lock, then this page may no longer be the
  	 * right place for the key we want to insert.  In this case, we need to
  	 * move right in the tree.	See Lehman and Yao for an excruciatingly
  	 * precise description.
***************
*** 146,151 ****
--- 256,269 ----
  			XactLockTableWait(xwait);
  			/* start over... */
  			_bt_freestack(stack);
+ 
+ 			/* find the first page containing this key */
+ 			stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
+ 
+ 			/* trade in our read lock for a write lock */
+ 			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+ 			LockBuffer(buf, BT_WRITE);
+ 
  			goto top;
  		}
  	}
***************
*** 157,162 ****
--- 275,281 ----
  	/* be tidy */
  	_bt_freestack(stack);
  	_bt_freeskey(itup_scankey);
+ 	pfree(itup);
  }
  
  /*
Index: src/backend/access/nbtree/nbtree.c
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/access/nbtree/nbtree.c,v
retrieving revision 1.154
diff -c -r1.154 nbtree.c
*** src/backend/access/nbtree/nbtree.c	5 Jan 2007 22:19:23 -0000	1.154
--- src/backend/access/nbtree/nbtree.c	19 May 2007 16:23:58 -0000
***************
*** 223,229 ****
  
  	_bt_doinsert(rel, itup, checkUnique, heapRel);
  
! 	pfree(itup);
  
  	PG_RETURN_BOOL(true);
  }
--- 223,278 ----
  
  	_bt_doinsert(rel, itup, checkUnique, heapRel);
  
! 	PG_RETURN_BOOL(true);
! }
! 
! /*
!  *	btprepareinsert() -- find the best place in the heap to put a new tuple.
!  *
!  *		This uses the same logic as btinsert to find the place where the index
!  *		tuple would go if this was a btinsert call.
!  */
! Datum
! btprepareinsert(PG_FUNCTION_ARGS)
! {
! 	Relation	rel = (Relation) PG_GETARG_POINTER(0);
! 	Datum	   *values = (Datum *) PG_GETARG_POINTER(1);
! 	bool	   *isnull = (bool *) PG_GETARG_POINTER(2);
! 	Relation	heapRel = (Relation) PG_GETARG_POINTER(3);
! 	bool		checkUnique = PG_GETARG_BOOL(4);
! 	void	  **opaquePtr = (void **) PG_GETARG_POINTER(5);
! 	IndexTuple	itup;
! 	BlockNumber suggestion;
! 
! 	/* generate an index tuple */
! 	itup = index_form_tuple(RelationGetDescr(rel), values, isnull);
! 
! 	suggestion =_bt_prepareinsert(rel, itup, checkUnique, heapRel,  
! 								  (BTInsertInfo *) opaquePtr);
! 
! 	PG_RETURN_UINT32(suggestion);
! }
! 
! /*
!  *	btfinishinsert() -- finish insert
!  */
! Datum
! btfinishinsert(PG_FUNCTION_ARGS)
! {
! 	ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(0);
! 	BTInsertInfo opaque = (void *) PG_GETARG_POINTER(1);
! 
! 	opaque->itup->t_tid = *ht_ctid;
! 
! 	_bt_finishinsert(opaque->rel,
! 					 opaque->heapRel,
! 					 opaque->index_is_unique,
! 					 opaque->itup,
! 					 opaque->itup_scankey,
! 					 opaque->stack,
! 					 opaque->buf);
! 
! 	pfree(opaque);
  
  	PG_RETURN_BOOL(true);
  }
Index: src/backend/commands/copy.c
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/commands/copy.c,v
retrieving revision 1.283
diff -c -r1.283 copy.c
*** src/backend/commands/copy.c	27 Apr 2007 22:05:46 -0000	1.283
--- src/backend/commands/copy.c	19 May 2007 17:14:53 -0000
***************
*** 2109,2115 ****
  				ExecConstraints(resultRelInfo, slot, estate);
  
  			/* OK, store the tuple and create index entries for it */
! 			heap_insert(cstate->rel, tuple, mycid, use_wal, use_fsm);
  
  			if (resultRelInfo->ri_NumIndices > 0)
  				ExecInsertIndexTuples(slot, &(tuple->t_self), estate, false);
--- 2109,2116 ----
  				ExecConstraints(resultRelInfo, slot, estate);
  
  			/* OK, store the tuple and create index entries for it */
! 			heap_insert(cstate->rel, tuple, mycid, use_wal, use_fsm, 
! 						InvalidBlockNumber);
  
  			if (resultRelInfo->ri_NumIndices > 0)
  				ExecInsertIndexTuples(slot, &(tuple->t_self), estate, false);
Index: src/backend/executor/execMain.c
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/executor/execMain.c,v
retrieving revision 1.293
diff -c -r1.293 execMain.c
*** src/backend/executor/execMain.c	27 Apr 2007 22:05:47 -0000	1.293
--- src/backend/executor/execMain.c	19 May 2007 16:24:01 -0000
***************
*** 53,58 ****
--- 53,59 ----
  #include "utils/lsyscache.h"
  #include "utils/memutils.h"
  
+ bool cluster_inserts = true; /* GUC */
  
  typedef struct evalPlanQual
  {
***************
*** 869,876 ****
--- 870,879 ----
  	resultRelInfo->ri_RangeTableIndex = resultRelationIndex;
  	resultRelInfo->ri_RelationDesc = resultRelationDesc;
  	resultRelInfo->ri_NumIndices = 0;
+ 	resultRelInfo->ri_ClusterIndex = -1;
  	resultRelInfo->ri_IndexRelationDescs = NULL;
  	resultRelInfo->ri_IndexRelationInfo = NULL;
+ 	resultRelInfo->ri_PreparedInsertOpaque = NULL;
  	/* make a copy so as not to depend on relcache info not changing... */
  	resultRelInfo->ri_TrigDesc = CopyTriggerDesc(resultRelationDesc->trigdesc);
  	if (resultRelInfo->ri_TrigDesc)
***************
*** 1353,1358 ****
--- 1356,1362 ----
  	ResultRelInfo *resultRelInfo;
  	Relation	resultRelationDesc;
  	Oid			newId;
+ 	BlockNumber suggestedBlock;
  
  	/*
  	 * get the heap tuple out of the tuple table slot, making sure we have a
***************
*** 1401,1406 ****
--- 1405,1417 ----
  	if (resultRelationDesc->rd_att->constr)
  		ExecConstraints(resultRelInfo, slot, estate);
  
+ 	/* Ask the index am of the clustered index for the 
+ 	 * best place to put it */
+ 	if(cluster_inserts)
+ 		suggestedBlock = ExecPrepareIndexInsert(slot, estate);
+ 	else
+ 		suggestedBlock = InvalidBlockNumber;
+ 
  	/*
  	 * insert the tuple
  	 *
***************
*** 1409,1415 ****
  	 */
  	newId = heap_insert(resultRelationDesc, tuple,
  						estate->es_snapshot->curcid,
! 						true, true);
  
  	IncrAppended();
  	(estate->es_processed)++;
--- 1420,1426 ----
  	 */
  	newId = heap_insert(resultRelationDesc, tuple,
  						estate->es_snapshot->curcid,
! 						true, true, suggestedBlock);
  
  	IncrAppended();
  	(estate->es_processed)++;
***************
*** 2600,2606 ****
  				tuple,
  				estate->es_snapshot->curcid,
  				estate->es_into_relation_use_wal,
! 				false);			/* never any point in using FSM */
  
  	/* We know this is a newly created relation, so there are no indexes */
  
--- 2611,2618 ----
  				tuple,
  				estate->es_snapshot->curcid,
  				estate->es_into_relation_use_wal,
! 				false, 			/* never any point in using FSM */
! 				InvalidBlockNumber);
  
  	/* We know this is a newly created relation, so there are no indexes */
  
Index: src/backend/executor/execUtils.c
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/executor/execUtils.c,v
retrieving revision 1.147
diff -c -r1.147 execUtils.c
*** src/backend/executor/execUtils.c	27 Feb 2007 01:11:25 -0000	1.147
--- src/backend/executor/execUtils.c	19 May 2007 18:22:33 -0000
***************
*** 31,36 ****
--- 31,37 ----
   *		ExecOpenIndices			\
   *		ExecCloseIndices		 | referenced by InitPlan, EndPlan,
   *		ExecInsertIndexTuples	/  ExecInsert, ExecUpdate
+  *		ExecPrepareIndexInsert	Referenced by ExecInsert
   *
   *		RegisterExprContextCallback    Register function shutdown callback
   *		UnregisterExprContextCallback  Deregister function shutdown callback
***************
*** 902,907 ****
--- 903,909 ----
  	IndexInfo **indexInfoArray;
  
  	resultRelInfo->ri_NumIndices = 0;
+ 	resultRelInfo->ri_ClusterIndex = -1;
  
  	/* fast path if no indexes */
  	if (!RelationGetForm(resultRelation)->relhasindex)
***************
*** 941,946 ****
--- 943,953 ----
  		/* extract index key information from the index's pg_index info */
  		ii = BuildIndexInfo(indexDesc);
  
+ 		/* Remember which index is the clustered one.
+ 		 * It's used to call the suggestblock-method on inserts */
+ 		if(indexDesc->rd_index->indisclustered)
+ 			resultRelInfo->ri_ClusterIndex = i;
+ 
  		relationDescs[i] = indexDesc;
  		indexInfoArray[i] = ii;
  		i++;
***************
*** 1007,1012 ****
--- 1014,1021 ----
  	ExprContext *econtext;
  	Datum		values[INDEX_MAX_KEYS];
  	bool		isnull[INDEX_MAX_KEYS];
+ 	int			clusterIndex;
+ 	bool		preparedInsert;
  
  	/*
  	 * Get information from the result relation info structure.
***************
*** 1016,1021 ****
--- 1025,1049 ----
  	relationDescs = resultRelInfo->ri_IndexRelationDescs;
  	indexInfoArray = resultRelInfo->ri_IndexRelationInfo;
  	heapRelation = resultRelInfo->ri_RelationDesc;
+ 	clusterIndex = resultRelInfo->ri_ClusterIndex;
+ 	preparedInsert = resultRelInfo->ri_PreparedInsertOpaque != NULL;
+ 
+ 	/* 
+ 	 * If the insert to the clustering index was already prepared,
+ 	 * finish it.
+ 	 */
+ 	if (preparedInsert)
+ 	{
+ 		index_finishinsert(relationDescs[clusterIndex],
+ 						   tupleid,
+ 						   resultRelInfo->ri_PreparedInsertOpaque);
+ 		resultRelInfo->ri_PreparedInsertOpaque = NULL;
+ 
+ 		/*
+ 		 * keep track of index inserts for debugging
+ 		 */
+ 		IncrIndexInserted();
+ 	}
  
  	/*
  	 * We will use the EState's per-tuple context for evaluating predicates
***************
*** 1036,1041 ****
--- 1064,1072 ----
  		if (relationDescs[i] == NULL)
  			continue;
  
+ 		if (preparedInsert && i == clusterIndex)
+ 			continue; /* insert to clustered index was already handled above */
+ 
  		indexInfo = indexInfoArray[i];
  
  		/* Check for partial index */
***************
*** 1090,1095 ****
--- 1121,1196 ----
  	}
  }
  
+ /* ----------------------------------------------------------------
+  *		ExecPrepareIndexInsert
+  *
+  *		This routine asks the index am where a new heap tuple
+  *		should be placed.
+  * ----------------------------------------------------------------
+  */
+ BlockNumber
+ ExecPrepareIndexInsert(TupleTableSlot *slot,
+ 					   EState *estate)
+ {
+ 	ResultRelInfo *resultRelInfo;
+ 	int			clusterIndex;
+ 	Relation	relationDesc;
+ 	Relation	heapRelation;
+ 	ExprContext *econtext;
+ 	Datum		values[INDEX_MAX_KEYS];
+ 	bool		isnull[INDEX_MAX_KEYS];
+ 	IndexInfo  *indexInfo;
+ 
+ 	/*
+ 	 * Get information from the result relation info structure.
+ 	 */
+ 	resultRelInfo = estate->es_result_relation_info;
+ 	clusterIndex = resultRelInfo->ri_ClusterIndex;
+ 
+ 	if (clusterIndex == -1)
+ 		return InvalidBlockNumber; /* there was no clustered index */
+ 
+ 	heapRelation = resultRelInfo->ri_RelationDesc;
+ 	relationDesc = resultRelInfo->ri_IndexRelationDescs[clusterIndex];
+ 	indexInfo = resultRelInfo->ri_IndexRelationInfo[clusterIndex];
+ 
+ 	if (!OidIsValid(relationDesc->rd_am->amprepareinsert))
+ 		return InvalidBlockNumber; /* the indexam doesn't support the
+ 									* two-phase insert API */
+ 
+ 	/* You can't cluster on a partial index */
+ 	Assert(indexInfo->ii_Predicate == NIL);
+ 
+ 	/*
+ 	 * We will use the EState's per-tuple context for evaluating 
+ 	 * index expressions (creating it if it's not already there).
+ 	 */
+ 	econtext = GetPerTupleExprContext(estate);
+ 
+ 	/* Arrange for econtext's scan tuple to be the tuple under test */
+ 	econtext->ecxt_scantuple = slot;
+ 
+ 	/*
+ 	 * FormIndexDatum fills in its values and isnull parameters with the
+ 	 * appropriate values for the column(s) of the index.
+ 	 */
+ 	FormIndexDatum(indexInfo,
+ 				   slot,
+ 				   estate,
+ 				   values,
+ 				   isnull);
+ 
+ 	/*
+ 	 * The index AM does the rest.
+ 	 */
+ 	return index_prepareinsert(relationDesc,	/* index relation */
+ 				 values,	/* array of index Datums */
+ 				 isnull,	/* null flags */
+ 				 heapRelation,
+ 				 relationDesc->rd_index->indisunique,
+ 				 &resultRelInfo->ri_PreparedInsertOpaque);
+ }
+ 
  /*
   * UpdateChangedParamSet
   *		Add changed parameters to a plan node's chgParam set
Index: src/backend/utils/misc/guc.c
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/utils/misc/guc.c,v
retrieving revision 1.391
diff -c -r1.391 guc.c
*** src/backend/utils/misc/guc.c	8 May 2007 16:33:51 -0000	1.391
--- src/backend/utils/misc/guc.c	19 May 2007 16:24:17 -0000
***************
*** 99,104 ****
--- 99,105 ----
  #define MS_PER_D (1000 * 60 * 60 * 24)
  
  /* XXX these should appear in other modules' header files */
+ extern bool cluster_inserts;
  extern bool Log_disconnections;
  extern int	CommitDelay;
  extern int	CommitSiblings;
***************
*** 427,432 ****
--- 428,441 ----
  static struct config_bool ConfigureNamesBool[] =
  {
  	{
+ 		{"cluster_inserts", PGC_USERSET, DEVELOPER_OPTIONS,
+ 			gettext_noop("Tries to maintain cluster order on inserts."),
+ 			NULL
+ 		},
+ 		&cluster_inserts,
+ 		true, NULL, NULL
+ 	},
+ 	{
  		{"enable_seqscan", PGC_USERSET, QUERY_TUNING_METHOD,
  			gettext_noop("Enables the planner's use of sequential-scan plans."),
  			NULL
Index: src/include/access/genam.h
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/include/access/genam.h,v
retrieving revision 1.66
diff -c -r1.66 genam.h
*** src/include/access/genam.h	5 Jan 2007 22:19:50 -0000	1.66
--- src/include/access/genam.h	19 May 2007 16:24:26 -0000
***************
*** 93,98 ****
--- 93,106 ----
  			 ItemPointer heap_t_ctid,
  			 Relation heapRelation,
  			 bool check_uniqueness);
+ extern BlockNumber index_prepareinsert(Relation indexRelation,
+ 			 Datum *values, bool *isnull,
+ 			 Relation heapRelation,
+ 			 bool check_uniqueness,
+ 			 void **opauqe);
+ extern bool index_finishinsert(Relation indexRelation,
+ 			 ItemPointer heap_t_ctid,
+ 			 void *opaque);
  
  extern IndexScanDesc index_beginscan(Relation heapRelation,
  				Relation indexRelation,
Index: src/include/access/heapam.h
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/include/access/heapam.h,v
retrieving revision 1.123
diff -c -r1.123 heapam.h
*** src/include/access/heapam.h	8 Apr 2007 01:26:33 -0000	1.123
--- src/include/access/heapam.h	19 May 2007 16:24:26 -0000
***************
*** 157,163 ****
  extern void setLastTid(const ItemPointer tid);
  
  extern Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid,
! 			bool use_wal, bool use_fsm);
  extern HTSU_Result heap_delete(Relation relation, ItemPointer tid,
  			ItemPointer ctid, TransactionId *update_xmax,
  			CommandId cid, Snapshot crosscheck, bool wait);
--- 157,163 ----
  extern void setLastTid(const ItemPointer tid);
  
  extern Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid,
! 			bool use_wal, bool use_fsm, BlockNumber suggestedblk);
  extern HTSU_Result heap_delete(Relation relation, ItemPointer tid,
  			ItemPointer ctid, TransactionId *update_xmax,
  			CommandId cid, Snapshot crosscheck, bool wait);
Index: src/include/access/nbtree.h
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/include/access/nbtree.h,v
retrieving revision 1.113
diff -c -r1.113 nbtree.h
*** src/include/access/nbtree.h	11 Apr 2007 20:47:38 -0000	1.113
--- src/include/access/nbtree.h	19 May 2007 16:24:26 -0000
***************
*** 508,517 ****
--- 508,540 ----
  extern Datum btbulkdelete(PG_FUNCTION_ARGS);
  extern Datum btvacuumcleanup(PG_FUNCTION_ARGS);
  extern Datum btoptions(PG_FUNCTION_ARGS);
+ extern Datum btprepareinsert(PG_FUNCTION_ARGS);
+ extern Datum btfinishinsert(PG_FUNCTION_ARGS);
+ 
+ /* Filled in by _bt_prepareinsert */
+ typedef struct BTInsertInfoData
+ {
+ 	Relation rel;
+ 	Relation heapRel;
+ 	bool index_is_unique;
+ 	IndexTuple itup;
+ 	ScanKey itup_scankey;
+ 	Buffer buf; /* pinned, not locked */
+ 	BTStack stack;
+ } BTInsertInfoData;
+ 
+ typedef BTInsertInfoData *BTInsertInfo;
  
  /*
   * prototypes for functions in nbtinsert.c
   */
+ extern BlockNumber _bt_prepareinsert(Relation rel, IndexTuple itup,
+ 									 bool index_is_unique, Relation heapRel,
+ 									 BTInsertInfo *opaquePtr);
+ extern void _bt_finishinsert(Relation rel, Relation heapRel, 
+ 							 bool check_uniqueness,
+ 							 IndexTuple itup, ScanKey itup_scankey,
+ 							 BTStack stack, Buffer buf);
  extern void _bt_doinsert(Relation rel, IndexTuple itup,
  			 bool index_is_unique, Relation heapRel);
  extern Buffer _bt_getstackbuf(Relation rel, BTStack stack, int access);
Index: src/include/catalog/pg_am.h
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/include/catalog/pg_am.h,v
retrieving revision 1.51
diff -c -r1.51 pg_am.h
*** src/include/catalog/pg_am.h	6 Apr 2007 22:33:43 -0000	1.51
--- src/include/catalog/pg_am.h	19 May 2007 16:42:48 -0000
***************
*** 66,71 ****
--- 66,73 ----
  	regproc		amvacuumcleanup;	/* post-VACUUM cleanup function */
  	regproc		amcostestimate; /* estimate cost of an indexscan */
  	regproc		amoptions;		/* parse AM-specific parameters */
+ 	regproc		amprepareinsert;	/* get desired insert location on heap */
+ 	regproc		amfinishinsert;	/* finish a prepared insert operation */
  } FormData_pg_am;
  
  /* ----------------
***************
*** 79,85 ****
   *		compiler constants for pg_am
   * ----------------
   */
! #define Natts_pg_am						24
  #define Anum_pg_am_amname				1
  #define Anum_pg_am_amstrategies			2
  #define Anum_pg_am_amsupport			3
--- 81,87 ----
   *		compiler constants for pg_am
   * ----------------
   */
! #define Natts_pg_am						26
  #define Anum_pg_am_amname				1
  #define Anum_pg_am_amstrategies			2
  #define Anum_pg_am_amsupport			3
***************
*** 104,125 ****
  #define Anum_pg_am_amvacuumcleanup		22
  #define Anum_pg_am_amcostestimate		23
  #define Anum_pg_am_amoptions			24
  
  /* ----------------
   *		initial contents of pg_am
   * ----------------
   */
  
! DATA(insert OID = 403 (  btree	5 1 t t t t t t f t btinsert btbeginscan btgettuple btgetmulti btrescan btendscan btmarkpos btrestrpos btbuild btbulkdelete btvacuumcleanup btcostestimate btoptions ));
  DESCR("b-tree index access method");
  #define BTREE_AM_OID 403
! DATA(insert OID = 405 (  hash	1 1 f f f f f f f f hashinsert hashbeginscan hashgettuple hashgetmulti hashrescan hashendscan hashmarkpos hashrestrpos hashbuild hashbulkdelete hashvacuumcleanup hashcostestimate hashoptions ));
  DESCR("hash index access method");
  #define HASH_AM_OID 405
! DATA(insert OID = 783 (  gist	0 7 f f t t t t t t gistinsert gistbeginscan gistgettuple gistgetmulti gistrescan gistendscan gistmarkpos gistrestrpos gistbuild gistbulkdelete gistvacuumcleanup gistcostestimate gistoptions ));
  DESCR("GiST index access method");
  #define GIST_AM_OID 783
! DATA(insert OID = 2742 (  gin	0 4 f f f f f f t f gininsert ginbeginscan gingettuple gingetmulti ginrescan ginendscan ginmarkpos ginrestrpos ginbuild ginbulkdelete ginvacuumcleanup gincostestimate ginoptions ));
  DESCR("GIN index access method");
  #define GIN_AM_OID 2742
  
--- 106,129 ----
  #define Anum_pg_am_amvacuumcleanup		22
  #define Anum_pg_am_amcostestimate		23
  #define Anum_pg_am_amoptions			24
+ #define Anum_pg_am_amprepareinsert		25
+ #define Anum_pg_am_amfinishinsert		26
  
  /* ----------------
   *		initial contents of pg_am
   * ----------------
   */
  
! DATA(insert OID = 403 (  btree	5 1 t t t t t t f t btinsert btbeginscan btgettuple btgetmulti btrescan btendscan btmarkpos btrestrpos btbuild btbulkdelete btvacuumcleanup btcostestimate btoptions btprepareinsert btfinishinsert));
  DESCR("b-tree index access method");
  #define BTREE_AM_OID 403
! DATA(insert OID = 405 (  hash	1 1 f f f f f f f f hashinsert hashbeginscan hashgettuple hashgetmulti hashrescan hashendscan hashmarkpos hashrestrpos hashbuild hashbulkdelete hashvacuumcleanup hashcostestimate hashoptions - -));
  DESCR("hash index access method");
  #define HASH_AM_OID 405
! DATA(insert OID = 783 (  gist	0 7 f f t t t t t t gistinsert gistbeginscan gistgettuple gistgetmulti gistrescan gistendscan gistmarkpos gistrestrpos gistbuild gistbulkdelete gistvacuumcleanup gistcostestimate gistoptions - -));
  DESCR("GiST index access method");
  #define GIST_AM_OID 783
! DATA(insert OID = 2742 (  gin	0 4 f f f f f f t f gininsert ginbeginscan gingettuple gingetmulti ginrescan ginendscan ginmarkpos ginrestrpos ginbuild ginbulkdelete ginvacuumcleanup gincostestimate ginoptions - -));
  DESCR("GIN index access method");
  #define GIN_AM_OID 2742
  
Index: src/include/catalog/pg_proc.h
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/include/catalog/pg_proc.h,v
retrieving revision 1.455
diff -c -r1.455 pg_proc.h
*** src/include/catalog/pg_proc.h	8 May 2007 18:56:48 -0000	1.455
--- src/include/catalog/pg_proc.h	19 May 2007 17:20:23 -0000
***************
*** 688,693 ****
--- 688,697 ----
  DESCR("btree(internal)");
  DATA(insert OID = 2785 (  btoptions		   PGNSP PGUID 12 1 0 f f t f s 2 17 "1009 16" _null_ _null_ _null_  btoptions - _null_ ));
  DESCR("btree(internal)");
+ DATA(insert OID = 5433 (  btprepareinsert   PGNSP PGUID 12 1 0 f f t f v 6 23 "2281 2281 2281 2281 2281 2281" _null_ _null_ _null_	btprepareinsert - _null_ ));
+ DESCR("btree(internal)");
+ DATA(insert OID = 5430 (  btfinishinsert   PGNSP PGUID 12 1 0 f f t f v 2 16 "2281 2281" _null_ _null_ _null_	btfinishinsert - _null_ ));
+ DESCR("btree(internal)");
  
  DATA(insert OID = 339 (  poly_same		   PGNSP PGUID 12 1 0 f f t f i 2 16 "604 604" _null_ _null_ _null_ poly_same - _null_ ));
  DESCR("same as?");
Index: src/include/executor/executor.h
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/include/executor/executor.h,v
retrieving revision 1.139
diff -c -r1.139 executor.h
*** src/include/executor/executor.h	27 Feb 2007 01:11:25 -0000	1.139
--- src/include/executor/executor.h	19 May 2007 16:24:27 -0000
***************
*** 276,281 ****
--- 276,282 ----
  extern void ExecCloseIndices(ResultRelInfo *resultRelInfo);
  extern void ExecInsertIndexTuples(TupleTableSlot *slot, ItemPointer tupleid,
  					  EState *estate, bool is_vacuum);
+ extern BlockNumber ExecPrepareIndexInsert(TupleTableSlot *slot, EState *estate);
  
  extern void RegisterExprContextCallback(ExprContext *econtext,
  							ExprContextCallbackFunction function,
Index: src/include/nodes/execnodes.h
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/include/nodes/execnodes.h,v
retrieving revision 1.174
diff -c -r1.174 execnodes.h
*** src/include/nodes/execnodes.h	17 May 2007 19:35:08 -0000	1.174
--- src/include/nodes/execnodes.h	19 May 2007 16:24:27 -0000
***************
*** 264,269 ****
--- 264,271 ----
   *		NumIndices				# of indices existing on result relation
   *		IndexRelationDescs		array of relation descriptors for indices
   *		IndexRelationInfo		array of key/attr info for indices
+  *		ClusterIndex			index to the IndexRelationInfo array of the
+  *								clustered index, or -1 if there's none
   *		TrigDesc				triggers to be fired, if any
   *		TrigFunctions			cached lookup info for trigger functions
   *		TrigInstrument			optional runtime measurements for triggers
***************
*** 280,291 ****
--- 282,296 ----
  	int			ri_NumIndices;
  	RelationPtr ri_IndexRelationDescs;
  	IndexInfo **ri_IndexRelationInfo;
+ 	int         ri_ClusterIndex;
  	TriggerDesc *ri_TrigDesc;
  	FmgrInfo   *ri_TrigFunctions;
  	struct Instrumentation *ri_TrigInstrument;
  	List	  **ri_ConstraintExprs;
  	JunkFilter *ri_junkFilter;
  	ProjectionInfo *ri_projectReturning;
+ 
+ 	void		*ri_PreparedInsertOpaque;
  } ResultRelInfo;
  
  /* ----------------
Index: src/include/utils/rel.h
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/include/utils/rel.h,v
retrieving revision 1.100
diff -c -r1.100 rel.h
*** src/include/utils/rel.h	29 Mar 2007 00:15:39 -0000	1.100
--- src/include/utils/rel.h	19 May 2007 16:24:29 -0000
***************
*** 117,122 ****
--- 117,124 ----
  	FmgrInfo	amvacuumcleanup;
  	FmgrInfo	amcostestimate;
  	FmgrInfo	amoptions;
+ 	FmgrInfo	amprepareinsert;
+ 	FmgrInfo	amfinishinsert;
  } RelationAmInfo;
  
  
---------------------------(end of broadcast)---------------------------
TIP 1: if posting/reading through Usenet, please send an appropriate
       subscribe-nomail command to [EMAIL PROTECTED] so that your
       message can get through to the mailing list cleanly

Reply via email to