Jaime Casanova wrote:
On 5/18/07, Heikki Linnakangas <[EMAIL PROTECTED]> wrote:
Jaime Casanova wrote:
> the patch doesn't apply in cvs... you'll need to update it...

Oh, here you are.

The implementation has changed a bit since August. I thought I had
submitted an updated version in the winter but couldn't find it. Anyway,
I updated and dusted off the source tree, tidied up the comments a
little bit, and fixed some inconsistencies in pg_proc entries that made
opr_sanity to fail.

this one doesn't apply either... there are problems with nbtinsert.c and pg_am.h

Ah, sorry about that. For some reason my source tree was checked out from the 8.2 branch, instead of CVS HEAD.

Here you are. Thanks for looking at this!

  Heikki Linnakangas
  EnterpriseDB   http://www.enterprisedb.com
Index: doc/src/sgml/catalogs.sgml
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/doc/src/sgml/catalogs.sgml,v
retrieving revision 2.152
diff -c -r2.152 catalogs.sgml
*** doc/src/sgml/catalogs.sgml	15 May 2007 19:13:54 -0000	2.152
--- doc/src/sgml/catalogs.sgml	19 May 2007 16:23:49 -0000
*** 517,522 ****
--- 517,536 ----
        <entry>Function to parse and validate <structfield>reloptions</> for an index</entry>
+      <row>
+       <entry><structfield>amprepareinsert</structfield></entry>
+       <entry><type>regproc</type></entry>
+       <entry><literal><link linkend="catalog-pg-proc"><structname>pg_proc</structname></link>.oid</literal></entry>
+       <entry>Performs the 1st phase of a two phase index insert, returning a suggestion of where in the heap to put a new tuple</entry>
+      </row>
+      <row>
+       <entry><structfield>amfinishinsert</structfield></entry>
+       <entry><type>regproc</type></entry>
+       <entry><literal><link linkend="catalog-pg-proc"><structname>pg_proc</structname></link>.oid</literal></entry>
+       <entry>Finishes an index insert started with amprepareinsert</entry>
+      </row>
Index: src/backend/access/heap/heapam.c
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/access/heap/heapam.c,v
retrieving revision 1.232
diff -c -r1.232 heapam.c
*** src/backend/access/heap/heapam.c	8 Apr 2007 01:26:27 -0000	1.232
--- src/backend/access/heap/heapam.c	19 May 2007 16:45:14 -0000
*** 1368,1373 ****
--- 1368,1377 ----
   * Note that use_wal and use_fsm will be applied when inserting into the
   * heap's TOAST table, too, if the tuple requires any out-of-line data.
+  * If suggested_blk is a valid block number, the tuple will be inserted to
+  * that block if there's enough room. If it's full, a block will be chosen
+  * as if suggested_blk was not set.
+  *
   * The return value is the OID assigned to the tuple (either here or by the
   * caller), or InvalidOid if no OID.  The header fields of *tup are updated
   * to match the stored tuple; in particular tup->t_self receives the actual
*** 1376,1382 ****
  heap_insert(Relation relation, HeapTuple tup, CommandId cid,
! 			bool use_wal, bool use_fsm)
  	TransactionId xid = GetCurrentTransactionId();
  	HeapTuple	heaptup;
--- 1380,1386 ----
  heap_insert(Relation relation, HeapTuple tup, CommandId cid,
! 			bool use_wal, bool use_fsm, BlockNumber suggested_blk)
  	TransactionId xid = GetCurrentTransactionId();
  	HeapTuple	heaptup;
*** 1432,1440 ****
  		heaptup = tup;
! 	/* Find buffer to insert this tuple into */
! 	buffer = RelationGetBufferForTuple(relation, heaptup->t_len,
! 									   InvalidBuffer, use_fsm);
  	/* NO EREPORT(ERROR) from here till changes are logged */
--- 1436,1478 ----
  		heaptup = tup;
! 	/* Find buffer to insert this tuple into. Try the suggested block first
! 	 * if caller gave one.
! 	 */
! 	if (suggested_blk != InvalidBlockNumber)
! 	{
! 		Buffer suggested_buf;
! 		Page pageHeader;
! 		Size pageFreeSpace;
! 		suggested_buf = ReadBuffer(relation, suggested_blk);
! 		pageHeader = (Page) BufferGetPage(suggested_buf);
! 		LockBuffer(suggested_buf, BUFFER_LOCK_EXCLUSIVE);
! 		/* Don't subtract fillfactor from the free space. That space is
! 		 * reserved exactly for situations like this; keeping updated and
! 		 * inserted tuples close to other tuples with similar values.
! 		 */
! 		pageFreeSpace = PageGetFreeSpace(pageHeader);
! 		if (heaptup->t_len <= pageFreeSpace)
! 			buffer = suggested_buf;
! 		else
! 		{
! 			/* Page was full. Release lock and pin and get another block
! 			 * as if suggested_blk was not given. 
! 			 */
! 			LockBuffer(suggested_buf, BUFFER_LOCK_UNLOCK);
! 			ReleaseBuffer(suggested_buf);
! 			buffer = RelationGetBufferForTuple(relation, heaptup->t_len,
! 											   InvalidBuffer, use_fsm);
! 		}
! 	} 
! 	else
! 		buffer = RelationGetBufferForTuple(relation, heaptup->t_len,
! 										   InvalidBuffer, use_fsm);
  	/* NO EREPORT(ERROR) from here till changes are logged */
*** 1544,1550 ****
  simple_heap_insert(Relation relation, HeapTuple tup)
! 	return heap_insert(relation, tup, GetCurrentCommandId(), true, true);
--- 1582,1589 ----
  simple_heap_insert(Relation relation, HeapTuple tup)
! 	return heap_insert(relation, tup, GetCurrentCommandId(), true, 
! 					   true, InvalidBlockNumber);
Index: src/backend/access/heap/tuptoaster.c
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/access/heap/tuptoaster.c,v
retrieving revision 1.74
diff -c -r1.74 tuptoaster.c
*** src/backend/access/heap/tuptoaster.c	6 Apr 2007 04:21:41 -0000	1.74
--- src/backend/access/heap/tuptoaster.c	19 May 2007 16:45:39 -0000
*** 1146,1152 ****
  		if (!HeapTupleIsValid(toasttup))
  			elog(ERROR, "failed to build TOAST tuple");
! 		heap_insert(toastrel, toasttup, mycid, use_wal, use_fsm);
  		 * Create the index entry.	We cheat a little here by not using
--- 1146,1153 ----
  		if (!HeapTupleIsValid(toasttup))
  			elog(ERROR, "failed to build TOAST tuple");
! 		heap_insert(toastrel, toasttup, mycid, use_wal, use_fsm, 
! 					InvalidBlockNumber);
  		 * Create the index entry.	We cheat a little here by not using
Index: src/backend/access/index/indexam.c
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/access/index/indexam.c,v
retrieving revision 1.97
diff -c -r1.97 indexam.c
*** src/backend/access/index/indexam.c	5 Jan 2007 22:19:23 -0000	1.97
--- src/backend/access/index/indexam.c	19 May 2007 16:23:57 -0000
*** 18,23 ****
--- 18,25 ----
   *		index_rescan	- restart a scan of an index
   *		index_endscan	- end a scan
   *		index_insert	- insert an index tuple into a relation
+  *		index_prepareinsert	- get desired insert location for a heap tuple
+  *		index_finishinsert	- insert a previously prepared index tuple
   *		index_markpos	- mark a scan position
   *		index_restrpos	- restore a scan position
   *		index_getnext	- get the next tuple from a scan
*** 202,207 ****
--- 204,269 ----
+ /* ----------------
+  *		index_prepareinsert - get desired insert location for a heap tuple
+  *
+  * The returned BlockNumber is the *heap* page that is the best place
+  * to insert the given tuple to, according to the index am. The best
+  * place is one that maintains the cluster order.
+  *
+  * opaque should be passed to a later index_finishinsert to finish the
+  * insert.
+  * ----------------
+  */
+ BlockNumber
+ index_prepareinsert(Relation indexRelation,
+ 					Datum *values,
+ 					bool *isnull,
+ 					Relation heapRelation,
+ 					bool check_uniqueness,
+ 					void **opaque)
+ {
+ 	FmgrInfo   *procedure;
+ 	GET_REL_PROCEDURE(amprepareinsert);
+ 	/*
+ 	 * have the am's prepareinsert proc do all the work.
+ 	 */
+ 	return DatumGetUInt32(FunctionCall6(procedure,
+ 										PointerGetDatum(indexRelation),
+ 										PointerGetDatum(values),
+ 										PointerGetDatum(isnull),
+ 										PointerGetDatum(heapRelation),
+ 										BoolGetDatum(check_uniqueness),
+ 										PointerGetDatum(opaque)));
+ }
+ /* ----------------
+  *		index_finishinsert - insert a previously prepared index tuple
+  *
+  * Finishes an insert operation initiated by an earlier call to
+  * index_prepareinsert. 
+  * ----------------
+  */
+ bool
+ index_finishinsert(Relation indexRelation,
+ 				   ItemPointer heap_t_ctid, void *opaque)
+ {
+ 	FmgrInfo   *procedure;
+ 	GET_REL_PROCEDURE(amfinishinsert);
+ 	/*
+ 	 * have the am's finishinsert proc do all the work.
+ 	 */
+ 	return DatumGetBool(FunctionCall2(procedure,
+ 									  PointerGetDatum(heap_t_ctid),
+ 									  PointerGetDatum(opaque)));
+ }
   * index_beginscan - start a scan of an index with amgettuple
Index: src/backend/access/nbtree/nbtinsert.c
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/access/nbtree/nbtinsert.c,v
retrieving revision 1.156
diff -c -r1.156 nbtinsert.c
*** src/backend/access/nbtree/nbtinsert.c	11 Apr 2007 20:47:37 -0000	1.156
--- src/backend/access/nbtree/nbtinsert.c	19 May 2007 18:12:25 -0000
*** 96,114 ****
  	/* we need an insertion scan key to do our search, so build one */
  	itup_scankey = _bt_mkscankey(rel, itup);
- top:
  	/* find the first page containing this key */
  	stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
  	offset = InvalidOffsetNumber;
! 	/* trade in our read lock for a write lock */
  	LockBuffer(buf, BUFFER_LOCK_UNLOCK);
  	LockBuffer(buf, BT_WRITE);
  	 * If the page was split between the time that we surrendered our read
! 	 * lock and acquired our write lock, then this page may no longer be the
  	 * right place for the key we want to insert.  In this case, we need to
  	 * move right in the tree.	See Lehman and Yao for an excruciatingly
  	 * precise description.
--- 96,224 ----
  	/* we need an insertion scan key to do our search, so build one */
  	itup_scankey = _bt_mkscankey(rel, itup);
  	/* find the first page containing this key */
  	stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
  	offset = InvalidOffsetNumber;
! 	/* release our read lock. _bt_finishinsert will relock the page in
! 	 * exclusive mode. 
! 	 */
  	LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+ 	_bt_finishinsert(rel, heapRel, index_is_unique, itup, 
+ 					 itup_scankey, stack, buf);
+ }
+ /*
+  *	_bt_prepareinsert() -- Find the insert location for a new tuple
+  *
+  * Descends the tree and finds the location for a new index tuple.
+  * As a hint to the executor, returns the heap block number the previous
+  * index tuple at that location points to. By inserting the heap tuple
+  * to that block, the heap will stay better clustered than by inserting
+  * to a random block.
+  *
+  * The leaf page is pinned and a reference to it, among other information
+  * needed to finish the insert, is stored in opaquePtr.
+  */
+ BlockNumber
+ _bt_prepareinsert(Relation rel, IndexTuple itup, bool index_is_unique, 
+ 				  Relation heapRel, BTInsertInfo *opaquePtr)
+ {
+ 	int			natts = rel->rd_rel->relnatts;
+ 	OffsetNumber offset;
+ 	Page		page;
+ 	BTPageOpaque opaque;
+ 	ScanKey		itup_scankey;
+ 	BTStack		stack;
+ 	Buffer		buf;
+ 	BlockNumber suggestion = InvalidBlockNumber;
+ 	BTInsertInfo insert_opaque;
+ 	/* we need an insertion scan key to do our search, so build one */
+ 	itup_scankey = _bt_mkscankey(rel, itup);
+ 	/* find the first page containing this key */
+ 	stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_READ);
+ 	if(!BufferIsValid(buf))
+ 	{
+ 		/* The index was completely empty. No suggestion then. */
+ 		*opaquePtr = NULL;
+ 		return InvalidBlockNumber;
+ 	}
+ 	page = BufferGetPage(buf);
+ 	opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+ 	/* Find the location in the page where the new index tuple would go to. */
+ 	offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
+ 	if (offset > PageGetMaxOffsetNumber(page))
+ 	{
+ 		/* _bt_binsrch returned pointer to end-of-page. It means that
+ 		 * there was no equal items on the page, and the new item should 
+ 		 * be inserted as the last tuple of the page. There could be equal
+ 		 * items on the next page, however.
+ 		 *
+ 		 * At the moment, we just ignore the potential equal items on the 
+ 		 * right, and pretend there isn't any. We could instead walk right
+ 		 * to the next page to check that, but let's keep it simple for now.
+ 		 */
+ 		offset = OffsetNumberPrev(offset);
+ 	}
+ 	if(offset < P_FIRSTDATAKEY(opaque))
+ 	{
+ 		/* We landed on an empty page. We could step left or right until
+ 		 * we find some items, but let's keep it simple for now. 
+ 		 */
+ 	} else {
+ 		/* We're now positioned at the index tuple that we're interested in. */
+ 		ItemId iid = PageGetItemId(page, offset);
+ 		IndexTuple curitup = (IndexTuple) PageGetItem(page, iid);
+ 		suggestion = ItemPointerGetBlockNumber(&curitup->t_tid);
+ 	}
+ 	/* Release the read lock. _bt_finishinsert will later reacquire it in 
+ 	 * exclusive mode. Keeping the buffer locked would be deadlock-prone
+ 	 * as well; who knows what the caller is going to do, and what pages to
+ 	 * lock, before calling finishinsert.
+ 	 */
+ 	LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+ 	/* Return a struct with all information needed to finish this insert. */
+ 	insert_opaque = *opaquePtr = palloc(sizeof(struct BTInsertInfoData));
+ 	insert_opaque->rel = rel;
+ 	insert_opaque->heapRel = heapRel;
+ 	insert_opaque->index_is_unique = index_is_unique;
+ 	insert_opaque->itup = itup;
+ 	insert_opaque->itup_scankey = itup_scankey;
+ 	insert_opaque->stack = stack;
+ 	insert_opaque->buf = buf;
+ 	return suggestion;
+ }
+ /*
+  *	_bt_finishinsert() -- Finish an insert prepared with prepareinsert
+  */
+ void
+ _bt_finishinsert(Relation rel, Relation heapRel, bool index_is_unique,
+ 				 IndexTuple itup, ScanKey itup_scankey,
+ 				 BTStack stack, Buffer buf)
+ {
+ 	int			natts = rel->rd_rel->relnatts;
+ 	OffsetNumber offset = InvalidOffsetNumber;
  	LockBuffer(buf, BT_WRITE);
+ top:
  	 * If the page was split between the time that we surrendered our read
! 	 * lock in _bt_prepareinsert or _bt_doinsert, and acquired our write lock, then this page may no longer be the
  	 * right place for the key we want to insert.  In this case, we need to
  	 * move right in the tree.	See Lehman and Yao for an excruciatingly
  	 * precise description.
*** 146,151 ****
--- 256,269 ----
  			/* start over... */
+ 			/* find the first page containing this key */
+ 			stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
+ 			/* trade in our read lock for a write lock */
+ 			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+ 			LockBuffer(buf, BT_WRITE);
  			goto top;
*** 157,162 ****
--- 275,281 ----
  	/* be tidy */
+ 	pfree(itup);
Index: src/backend/access/nbtree/nbtree.c
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/access/nbtree/nbtree.c,v
retrieving revision 1.154
diff -c -r1.154 nbtree.c
*** src/backend/access/nbtree/nbtree.c	5 Jan 2007 22:19:23 -0000	1.154
--- src/backend/access/nbtree/nbtree.c	19 May 2007 16:23:58 -0000
*** 223,229 ****
  	_bt_doinsert(rel, itup, checkUnique, heapRel);
! 	pfree(itup);
--- 223,278 ----
  	_bt_doinsert(rel, itup, checkUnique, heapRel);
! 	PG_RETURN_BOOL(true);
! }
! /*
!  *	btprepareinsert() -- find the best place in the heap to put a new tuple.
!  *
!  *		This uses the same logic as btinsert to find the place where the index
!  *		tuple would go if this was a btinsert call.
!  */
! Datum
! btprepareinsert(PG_FUNCTION_ARGS)
! {
! 	Relation	rel = (Relation) PG_GETARG_POINTER(0);
! 	Datum	   *values = (Datum *) PG_GETARG_POINTER(1);
! 	bool	   *isnull = (bool *) PG_GETARG_POINTER(2);
! 	Relation	heapRel = (Relation) PG_GETARG_POINTER(3);
! 	bool		checkUnique = PG_GETARG_BOOL(4);
! 	void	  **opaquePtr = (void **) PG_GETARG_POINTER(5);
! 	IndexTuple	itup;
! 	BlockNumber suggestion;
! 	/* generate an index tuple */
! 	itup = index_form_tuple(RelationGetDescr(rel), values, isnull);
! 	suggestion =_bt_prepareinsert(rel, itup, checkUnique, heapRel,  
! 								  (BTInsertInfo *) opaquePtr);
! 	PG_RETURN_UINT32(suggestion);
! }
! /*
!  *	btfinishinsert() -- finish insert
!  */
! Datum
! btfinishinsert(PG_FUNCTION_ARGS)
! {
! 	ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(0);
! 	BTInsertInfo opaque = (void *) PG_GETARG_POINTER(1);
! 	opaque->itup->t_tid = *ht_ctid;
! 	_bt_finishinsert(opaque->rel,
! 					 opaque->heapRel,
! 					 opaque->index_is_unique,
! 					 opaque->itup,
! 					 opaque->itup_scankey,
! 					 opaque->stack,
! 					 opaque->buf);
! 	pfree(opaque);
Index: src/backend/commands/copy.c
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/commands/copy.c,v
retrieving revision 1.283
diff -c -r1.283 copy.c
*** src/backend/commands/copy.c	27 Apr 2007 22:05:46 -0000	1.283
--- src/backend/commands/copy.c	19 May 2007 17:14:53 -0000
*** 2109,2115 ****
  				ExecConstraints(resultRelInfo, slot, estate);
  			/* OK, store the tuple and create index entries for it */
! 			heap_insert(cstate->rel, tuple, mycid, use_wal, use_fsm);
  			if (resultRelInfo->ri_NumIndices > 0)
  				ExecInsertIndexTuples(slot, &(tuple->t_self), estate, false);
--- 2109,2116 ----
  				ExecConstraints(resultRelInfo, slot, estate);
  			/* OK, store the tuple and create index entries for it */
! 			heap_insert(cstate->rel, tuple, mycid, use_wal, use_fsm, 
! 						InvalidBlockNumber);
  			if (resultRelInfo->ri_NumIndices > 0)
  				ExecInsertIndexTuples(slot, &(tuple->t_self), estate, false);
Index: src/backend/executor/execMain.c
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/executor/execMain.c,v
retrieving revision 1.293
diff -c -r1.293 execMain.c
*** src/backend/executor/execMain.c	27 Apr 2007 22:05:47 -0000	1.293
--- src/backend/executor/execMain.c	19 May 2007 16:24:01 -0000
*** 53,58 ****
--- 53,59 ----
  #include "utils/lsyscache.h"
  #include "utils/memutils.h"
+ bool cluster_inserts = true; /* GUC */
  typedef struct evalPlanQual
*** 869,876 ****
--- 870,879 ----
  	resultRelInfo->ri_RangeTableIndex = resultRelationIndex;
  	resultRelInfo->ri_RelationDesc = resultRelationDesc;
  	resultRelInfo->ri_NumIndices = 0;
+ 	resultRelInfo->ri_ClusterIndex = -1;
  	resultRelInfo->ri_IndexRelationDescs = NULL;
  	resultRelInfo->ri_IndexRelationInfo = NULL;
+ 	resultRelInfo->ri_PreparedInsertOpaque = NULL;
  	/* make a copy so as not to depend on relcache info not changing... */
  	resultRelInfo->ri_TrigDesc = CopyTriggerDesc(resultRelationDesc->trigdesc);
  	if (resultRelInfo->ri_TrigDesc)
*** 1353,1358 ****
--- 1356,1362 ----
  	ResultRelInfo *resultRelInfo;
  	Relation	resultRelationDesc;
  	Oid			newId;
+ 	BlockNumber suggestedBlock;
  	 * get the heap tuple out of the tuple table slot, making sure we have a
*** 1401,1406 ****
--- 1405,1417 ----
  	if (resultRelationDesc->rd_att->constr)
  		ExecConstraints(resultRelInfo, slot, estate);
+ 	/* Ask the index am of the clustered index for the 
+ 	 * best place to put it */
+ 	if(cluster_inserts)
+ 		suggestedBlock = ExecPrepareIndexInsert(slot, estate);
+ 	else
+ 		suggestedBlock = InvalidBlockNumber;
  	 * insert the tuple
*** 1409,1415 ****
  	newId = heap_insert(resultRelationDesc, tuple,
! 						true, true);
--- 1420,1426 ----
  	newId = heap_insert(resultRelationDesc, tuple,
! 						true, true, suggestedBlock);
*** 2600,2606 ****
! 				false);			/* never any point in using FSM */
  	/* We know this is a newly created relation, so there are no indexes */
--- 2611,2618 ----
! 				false, 			/* never any point in using FSM */
! 				InvalidBlockNumber);
  	/* We know this is a newly created relation, so there are no indexes */
Index: src/backend/executor/execUtils.c
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/executor/execUtils.c,v
retrieving revision 1.147
diff -c -r1.147 execUtils.c
*** src/backend/executor/execUtils.c	27 Feb 2007 01:11:25 -0000	1.147
--- src/backend/executor/execUtils.c	19 May 2007 18:22:33 -0000
*** 31,36 ****
--- 31,37 ----
   *		ExecOpenIndices			\
   *		ExecCloseIndices		 | referenced by InitPlan, EndPlan,
   *		ExecInsertIndexTuples	/  ExecInsert, ExecUpdate
+  *		ExecPrepareIndexInsert	Referenced by ExecInsert
   *		RegisterExprContextCallback    Register function shutdown callback
   *		UnregisterExprContextCallback  Deregister function shutdown callback
*** 902,907 ****
--- 903,909 ----
  	IndexInfo **indexInfoArray;
  	resultRelInfo->ri_NumIndices = 0;
+ 	resultRelInfo->ri_ClusterIndex = -1;
  	/* fast path if no indexes */
  	if (!RelationGetForm(resultRelation)->relhasindex)
*** 941,946 ****
--- 943,953 ----
  		/* extract index key information from the index's pg_index info */
  		ii = BuildIndexInfo(indexDesc);
+ 		/* Remember which index is the clustered one.
+ 		 * It's used to call the suggestblock-method on inserts */
+ 		if(indexDesc->rd_index->indisclustered)
+ 			resultRelInfo->ri_ClusterIndex = i;
  		relationDescs[i] = indexDesc;
  		indexInfoArray[i] = ii;
*** 1007,1012 ****
--- 1014,1021 ----
  	ExprContext *econtext;
  	Datum		values[INDEX_MAX_KEYS];
  	bool		isnull[INDEX_MAX_KEYS];
+ 	int			clusterIndex;
+ 	bool		preparedInsert;
  	 * Get information from the result relation info structure.
*** 1016,1021 ****
--- 1025,1049 ----
  	relationDescs = resultRelInfo->ri_IndexRelationDescs;
  	indexInfoArray = resultRelInfo->ri_IndexRelationInfo;
  	heapRelation = resultRelInfo->ri_RelationDesc;
+ 	clusterIndex = resultRelInfo->ri_ClusterIndex;
+ 	preparedInsert = resultRelInfo->ri_PreparedInsertOpaque != NULL;
+ 	/* 
+ 	 * If the insert to the clustering index was already prepared,
+ 	 * finish it.
+ 	 */
+ 	if (preparedInsert)
+ 	{
+ 		index_finishinsert(relationDescs[clusterIndex],
+ 						   tupleid,
+ 						   resultRelInfo->ri_PreparedInsertOpaque);
+ 		resultRelInfo->ri_PreparedInsertOpaque = NULL;
+ 		/*
+ 		 * keep track of index inserts for debugging
+ 		 */
+ 		IncrIndexInserted();
+ 	}
  	 * We will use the EState's per-tuple context for evaluating predicates
*** 1036,1041 ****
--- 1064,1072 ----
  		if (relationDescs[i] == NULL)
+ 		if (preparedInsert && i == clusterIndex)
+ 			continue; /* insert to clustered index was already handled above */
  		indexInfo = indexInfoArray[i];
  		/* Check for partial index */
*** 1090,1095 ****
--- 1121,1196 ----
+ /* ----------------------------------------------------------------
+  *		ExecPrepareIndexInsert
+  *
+  *		This routine asks the index am where a new heap tuple
+  *		should be placed.
+  * ----------------------------------------------------------------
+  */
+ BlockNumber
+ ExecPrepareIndexInsert(TupleTableSlot *slot,
+ 					   EState *estate)
+ {
+ 	ResultRelInfo *resultRelInfo;
+ 	int			clusterIndex;
+ 	Relation	relationDesc;
+ 	Relation	heapRelation;
+ 	ExprContext *econtext;
+ 	Datum		values[INDEX_MAX_KEYS];
+ 	bool		isnull[INDEX_MAX_KEYS];
+ 	IndexInfo  *indexInfo;
+ 	/*
+ 	 * Get information from the result relation info structure.
+ 	 */
+ 	resultRelInfo = estate->es_result_relation_info;
+ 	clusterIndex = resultRelInfo->ri_ClusterIndex;
+ 	if (clusterIndex == -1)
+ 		return InvalidBlockNumber; /* there was no clustered index */
+ 	heapRelation = resultRelInfo->ri_RelationDesc;
+ 	relationDesc = resultRelInfo->ri_IndexRelationDescs[clusterIndex];
+ 	indexInfo = resultRelInfo->ri_IndexRelationInfo[clusterIndex];
+ 	if (!OidIsValid(relationDesc->rd_am->amprepareinsert))
+ 		return InvalidBlockNumber; /* the indexam doesn't support the
+ 									* two-phase insert API */
+ 	/* You can't cluster on a partial index */
+ 	Assert(indexInfo->ii_Predicate == NIL);
+ 	/*
+ 	 * We will use the EState's per-tuple context for evaluating 
+ 	 * index expressions (creating it if it's not already there).
+ 	 */
+ 	econtext = GetPerTupleExprContext(estate);
+ 	/* Arrange for econtext's scan tuple to be the tuple under test */
+ 	econtext->ecxt_scantuple = slot;
+ 	/*
+ 	 * FormIndexDatum fills in its values and isnull parameters with the
+ 	 * appropriate values for the column(s) of the index.
+ 	 */
+ 	FormIndexDatum(indexInfo,
+ 				   slot,
+ 				   estate,
+ 				   values,
+ 				   isnull);
+ 	/*
+ 	 * The index AM does the rest.
+ 	 */
+ 	return index_prepareinsert(relationDesc,	/* index relation */
+ 				 values,	/* array of index Datums */
+ 				 isnull,	/* null flags */
+ 				 heapRelation,
+ 				 relationDesc->rd_index->indisunique,
+ 				 &resultRelInfo->ri_PreparedInsertOpaque);
+ }
   * UpdateChangedParamSet
   *		Add changed parameters to a plan node's chgParam set
Index: src/backend/utils/misc/guc.c
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/utils/misc/guc.c,v
retrieving revision 1.391
diff -c -r1.391 guc.c
*** src/backend/utils/misc/guc.c	8 May 2007 16:33:51 -0000	1.391
--- src/backend/utils/misc/guc.c	19 May 2007 16:24:17 -0000
*** 99,104 ****
--- 99,105 ----
  #define MS_PER_D (1000 * 60 * 60 * 24)
  /* XXX these should appear in other modules' header files */
+ extern bool cluster_inserts;
  extern bool Log_disconnections;
  extern int	CommitDelay;
  extern int	CommitSiblings;
*** 427,432 ****
--- 428,441 ----
  static struct config_bool ConfigureNamesBool[] =
+ 		{"cluster_inserts", PGC_USERSET, DEVELOPER_OPTIONS,
+ 			gettext_noop("Tries to maintain cluster order on inserts."),
+ 			NULL
+ 		},
+ 		&cluster_inserts,
+ 		true, NULL, NULL
+ 	},
+ 	{
  		{"enable_seqscan", PGC_USERSET, QUERY_TUNING_METHOD,
  			gettext_noop("Enables the planner's use of sequential-scan plans."),
Index: src/include/access/genam.h
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/include/access/genam.h,v
retrieving revision 1.66
diff -c -r1.66 genam.h
*** src/include/access/genam.h	5 Jan 2007 22:19:50 -0000	1.66
--- src/include/access/genam.h	19 May 2007 16:24:26 -0000
*** 93,98 ****
--- 93,106 ----
  			 ItemPointer heap_t_ctid,
  			 Relation heapRelation,
  			 bool check_uniqueness);
+ extern BlockNumber index_prepareinsert(Relation indexRelation,
+ 			 Datum *values, bool *isnull,
+ 			 Relation heapRelation,
+ 			 bool check_uniqueness,
+ 			 void **opauqe);
+ extern bool index_finishinsert(Relation indexRelation,
+ 			 ItemPointer heap_t_ctid,
+ 			 void *opaque);
  extern IndexScanDesc index_beginscan(Relation heapRelation,
  				Relation indexRelation,
Index: src/include/access/heapam.h
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/include/access/heapam.h,v
retrieving revision 1.123
diff -c -r1.123 heapam.h
*** src/include/access/heapam.h	8 Apr 2007 01:26:33 -0000	1.123
--- src/include/access/heapam.h	19 May 2007 16:24:26 -0000
*** 157,163 ****
  extern void setLastTid(const ItemPointer tid);
  extern Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid,
! 			bool use_wal, bool use_fsm);
  extern HTSU_Result heap_delete(Relation relation, ItemPointer tid,
  			ItemPointer ctid, TransactionId *update_xmax,
  			CommandId cid, Snapshot crosscheck, bool wait);
--- 157,163 ----
  extern void setLastTid(const ItemPointer tid);
  extern Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid,
! 			bool use_wal, bool use_fsm, BlockNumber suggestedblk);
  extern HTSU_Result heap_delete(Relation relation, ItemPointer tid,
  			ItemPointer ctid, TransactionId *update_xmax,
  			CommandId cid, Snapshot crosscheck, bool wait);
Index: src/include/access/nbtree.h
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/include/access/nbtree.h,v
retrieving revision 1.113
diff -c -r1.113 nbtree.h
*** src/include/access/nbtree.h	11 Apr 2007 20:47:38 -0000	1.113
--- src/include/access/nbtree.h	19 May 2007 16:24:26 -0000
*** 508,517 ****
--- 508,540 ----
  extern Datum btbulkdelete(PG_FUNCTION_ARGS);
  extern Datum btvacuumcleanup(PG_FUNCTION_ARGS);
  extern Datum btoptions(PG_FUNCTION_ARGS);
+ extern Datum btprepareinsert(PG_FUNCTION_ARGS);
+ extern Datum btfinishinsert(PG_FUNCTION_ARGS);
+ /* Filled in by _bt_prepareinsert */
+ typedef struct BTInsertInfoData
+ {
+ 	Relation rel;
+ 	Relation heapRel;
+ 	bool index_is_unique;
+ 	IndexTuple itup;
+ 	ScanKey itup_scankey;
+ 	Buffer buf; /* pinned, not locked */
+ 	BTStack stack;
+ } BTInsertInfoData;
+ typedef BTInsertInfoData *BTInsertInfo;
   * prototypes for functions in nbtinsert.c
+ extern BlockNumber _bt_prepareinsert(Relation rel, IndexTuple itup,
+ 									 bool index_is_unique, Relation heapRel,
+ 									 BTInsertInfo *opaquePtr);
+ extern void _bt_finishinsert(Relation rel, Relation heapRel, 
+ 							 bool check_uniqueness,
+ 							 IndexTuple itup, ScanKey itup_scankey,
+ 							 BTStack stack, Buffer buf);
  extern void _bt_doinsert(Relation rel, IndexTuple itup,
  			 bool index_is_unique, Relation heapRel);
  extern Buffer _bt_getstackbuf(Relation rel, BTStack stack, int access);
Index: src/include/catalog/pg_am.h
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/include/catalog/pg_am.h,v
retrieving revision 1.51
diff -c -r1.51 pg_am.h
*** src/include/catalog/pg_am.h	6 Apr 2007 22:33:43 -0000	1.51
--- src/include/catalog/pg_am.h	19 May 2007 16:42:48 -0000
*** 66,71 ****
--- 66,73 ----
  	regproc		amvacuumcleanup;	/* post-VACUUM cleanup function */
  	regproc		amcostestimate; /* estimate cost of an indexscan */
  	regproc		amoptions;		/* parse AM-specific parameters */
+ 	regproc		amprepareinsert;	/* get desired insert location on heap */
+ 	regproc		amfinishinsert;	/* finish a prepared insert operation */
  } FormData_pg_am;
  /* ----------------
*** 79,85 ****
   *		compiler constants for pg_am
   * ----------------
! #define Natts_pg_am						24
  #define Anum_pg_am_amname				1
  #define Anum_pg_am_amstrategies			2
  #define Anum_pg_am_amsupport			3
--- 81,87 ----
   *		compiler constants for pg_am
   * ----------------
! #define Natts_pg_am						26
  #define Anum_pg_am_amname				1
  #define Anum_pg_am_amstrategies			2
  #define Anum_pg_am_amsupport			3
*** 104,125 ****
  #define Anum_pg_am_amvacuumcleanup		22
  #define Anum_pg_am_amcostestimate		23
  #define Anum_pg_am_amoptions			24
  /* ----------------
   *		initial contents of pg_am
   * ----------------
! DATA(insert OID = 403 (  btree	5 1 t t t t t t f t btinsert btbeginscan btgettuple btgetmulti btrescan btendscan btmarkpos btrestrpos btbuild btbulkdelete btvacuumcleanup btcostestimate btoptions ));
  DESCR("b-tree index access method");
  #define BTREE_AM_OID 403
! DATA(insert OID = 405 (  hash	1 1 f f f f f f f f hashinsert hashbeginscan hashgettuple hashgetmulti hashrescan hashendscan hashmarkpos hashrestrpos hashbuild hashbulkdelete hashvacuumcleanup hashcostestimate hashoptions ));
  DESCR("hash index access method");
  #define HASH_AM_OID 405
! DATA(insert OID = 783 (  gist	0 7 f f t t t t t t gistinsert gistbeginscan gistgettuple gistgetmulti gistrescan gistendscan gistmarkpos gistrestrpos gistbuild gistbulkdelete gistvacuumcleanup gistcostestimate gistoptions ));
  DESCR("GiST index access method");
  #define GIST_AM_OID 783
! DATA(insert OID = 2742 (  gin	0 4 f f f f f f t f gininsert ginbeginscan gingettuple gingetmulti ginrescan ginendscan ginmarkpos ginrestrpos ginbuild ginbulkdelete ginvacuumcleanup gincostestimate ginoptions ));
  DESCR("GIN index access method");
  #define GIN_AM_OID 2742
--- 106,129 ----
  #define Anum_pg_am_amvacuumcleanup		22
  #define Anum_pg_am_amcostestimate		23
  #define Anum_pg_am_amoptions			24
+ #define Anum_pg_am_amprepareinsert		25
+ #define Anum_pg_am_amfinishinsert		26
  /* ----------------
   *		initial contents of pg_am
   * ----------------
! DATA(insert OID = 403 (  btree	5 1 t t t t t t f t btinsert btbeginscan btgettuple btgetmulti btrescan btendscan btmarkpos btrestrpos btbuild btbulkdelete btvacuumcleanup btcostestimate btoptions btprepareinsert btfinishinsert));
  DESCR("b-tree index access method");
  #define BTREE_AM_OID 403
! DATA(insert OID = 405 (  hash	1 1 f f f f f f f f hashinsert hashbeginscan hashgettuple hashgetmulti hashrescan hashendscan hashmarkpos hashrestrpos hashbuild hashbulkdelete hashvacuumcleanup hashcostestimate hashoptions - -));
  DESCR("hash index access method");
  #define HASH_AM_OID 405
! DATA(insert OID = 783 (  gist	0 7 f f t t t t t t gistinsert gistbeginscan gistgettuple gistgetmulti gistrescan gistendscan gistmarkpos gistrestrpos gistbuild gistbulkdelete gistvacuumcleanup gistcostestimate gistoptions - -));
  DESCR("GiST index access method");
  #define GIST_AM_OID 783
! DATA(insert OID = 2742 (  gin	0 4 f f f f f f t f gininsert ginbeginscan gingettuple gingetmulti ginrescan ginendscan ginmarkpos ginrestrpos ginbuild ginbulkdelete ginvacuumcleanup gincostestimate ginoptions - -));
  DESCR("GIN index access method");
  #define GIN_AM_OID 2742
Index: src/include/catalog/pg_proc.h
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/include/catalog/pg_proc.h,v
retrieving revision 1.455
diff -c -r1.455 pg_proc.h
*** src/include/catalog/pg_proc.h	8 May 2007 18:56:48 -0000	1.455
--- src/include/catalog/pg_proc.h	19 May 2007 17:20:23 -0000
*** 688,693 ****
--- 688,697 ----
  DATA(insert OID = 2785 (  btoptions		   PGNSP PGUID 12 1 0 f f t f s 2 17 "1009 16" _null_ _null_ _null_  btoptions - _null_ ));
+ DATA(insert OID = 5433 (  btprepareinsert   PGNSP PGUID 12 1 0 f f t f v 6 23 "2281 2281 2281 2281 2281 2281" _null_ _null_ _null_	btprepareinsert - _null_ ));
+ DESCR("btree(internal)");
+ DATA(insert OID = 5430 (  btfinishinsert   PGNSP PGUID 12 1 0 f f t f v 2 16 "2281 2281" _null_ _null_ _null_	btfinishinsert - _null_ ));
+ DESCR("btree(internal)");
  DATA(insert OID = 339 (  poly_same		   PGNSP PGUID 12 1 0 f f t f i 2 16 "604 604" _null_ _null_ _null_ poly_same - _null_ ));
  DESCR("same as?");
Index: src/include/executor/executor.h
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/include/executor/executor.h,v
retrieving revision 1.139
diff -c -r1.139 executor.h
*** src/include/executor/executor.h	27 Feb 2007 01:11:25 -0000	1.139
--- src/include/executor/executor.h	19 May 2007 16:24:27 -0000
*** 276,281 ****
--- 276,282 ----
  extern void ExecCloseIndices(ResultRelInfo *resultRelInfo);
  extern void ExecInsertIndexTuples(TupleTableSlot *slot, ItemPointer tupleid,
  					  EState *estate, bool is_vacuum);
+ extern BlockNumber ExecPrepareIndexInsert(TupleTableSlot *slot, EState *estate);
  extern void RegisterExprContextCallback(ExprContext *econtext,
  							ExprContextCallbackFunction function,
Index: src/include/nodes/execnodes.h
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/include/nodes/execnodes.h,v
retrieving revision 1.174
diff -c -r1.174 execnodes.h
*** src/include/nodes/execnodes.h	17 May 2007 19:35:08 -0000	1.174
--- src/include/nodes/execnodes.h	19 May 2007 16:24:27 -0000
*** 264,269 ****
--- 264,271 ----
   *		NumIndices				# of indices existing on result relation
   *		IndexRelationDescs		array of relation descriptors for indices
   *		IndexRelationInfo		array of key/attr info for indices
+  *		ClusterIndex			index to the IndexRelationInfo array of the
+  *								clustered index, or -1 if there's none
   *		TrigDesc				triggers to be fired, if any
   *		TrigFunctions			cached lookup info for trigger functions
   *		TrigInstrument			optional runtime measurements for triggers
*** 280,291 ****
--- 282,296 ----
  	int			ri_NumIndices;
  	RelationPtr ri_IndexRelationDescs;
  	IndexInfo **ri_IndexRelationInfo;
+ 	int         ri_ClusterIndex;
  	TriggerDesc *ri_TrigDesc;
  	FmgrInfo   *ri_TrigFunctions;
  	struct Instrumentation *ri_TrigInstrument;
  	List	  **ri_ConstraintExprs;
  	JunkFilter *ri_junkFilter;
  	ProjectionInfo *ri_projectReturning;
+ 	void		*ri_PreparedInsertOpaque;
  } ResultRelInfo;
  /* ----------------
Index: src/include/utils/rel.h
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/include/utils/rel.h,v
retrieving revision 1.100
diff -c -r1.100 rel.h
*** src/include/utils/rel.h	29 Mar 2007 00:15:39 -0000	1.100
--- src/include/utils/rel.h	19 May 2007 16:24:29 -0000
*** 117,122 ****
--- 117,124 ----
  	FmgrInfo	amvacuumcleanup;
  	FmgrInfo	amcostestimate;
  	FmgrInfo	amoptions;
+ 	FmgrInfo	amprepareinsert;
+ 	FmgrInfo	amfinishinsert;
  } RelationAmInfo;
---------------------------(end of broadcast)---------------------------
TIP 1: if posting/reading through Usenet, please send an appropriate
       subscribe-nomail command to [EMAIL PROTECTED] so that your
       message can get through to the mailing list cleanly

Reply via email to