Here's an update, fixing conflict by Tom's recent commit of Simon's patch to skip WAL-inserts when archiving is not enabled.

Heikki Linnakangas wrote:
This patch makes CLUSTER MVCC-safe. Visibility information and update chains are preserved like in VACUUM FULL.

I created a new generic rewriteheap-facility to handle rewriting tables in a visibility-preserving manner. All the update chain tracking is done in rewriteheap.c, the caller is responsible for supplying the stream of tuples.

CLUSTER is currently the only user of the facility, but I'm envisioning we might have other users in the future. For example, a version of VACUUM FULL that rewrites the whole table. We could also use it to make ALTER TABLE MVCC-safe, but there's some issues with that. For example, what to do if RECENTLY_DEAD tuples don't satisfy a newly added constraint.

One complication in the implementation was the fact that heap_insert overwrites the visibility information, and it doesn't write the full tuple header to WAL. I ended up implementing a special-purpose raw_heap_insert function instead, which is optimized for bulk inserting a lot of tuples, knowing that we have exclusive access to the heap. raw_heap_insert keeps the current buffer locked over calls, until it gets full, and inserts the whole page to WAL as a single record using the existing XLOG_HEAP_NEWPAGE record type.

This makes CLUSTER a more viable alternative to VACUUM FULL. One motivation for making CLUSTER MVCC-safe is that if some poor soul runs pg_dump to make a backup concurrently with CLUSTER, the clustered tables will appear to be empty in the dump file.

The documentation doesn't anything about CLUSTER not being MVCC-safe, so I suppose there's no need to touch the docs. I sent a doc patch earlier to add a note about it, that doc patch should still be applied to older release branches, IMO.

--
  Heikki Linnakangas
  EnterpriseDB   http://www.enterprisedb.com
Index: src/backend/access/heap/heapam.c
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/access/heap/heapam.c,v
retrieving revision 1.230
diff -c -r1.230 heapam.c
*** src/backend/access/heap/heapam.c	29 Mar 2007 00:15:37 -0000	1.230
--- src/backend/access/heap/heapam.c	29 Mar 2007 08:27:21 -0000
***************
*** 3280,3285 ****
--- 3280,3322 ----
  	return log_heap_update(reln, oldbuf, from, newbuf, newtup, true);
  }
  
+ /*
+  * Perofrm XLogInsert of a full page image to WAL. Caller must make sure
+  * the page contents on disk are consistent with the page inserted to WAL.
+  */
+ XLogRecPtr
+ log_newpage(RelFileNode *rnode, BlockNumber blkno, Page page)
+ {
+ 	xl_heap_newpage xlrec;
+ 	XLogRecPtr	recptr;
+ 	XLogRecData rdata[2];
+ 
+ 	/* NO ELOG(ERROR) from here till newpage op is logged */
+ 	START_CRIT_SECTION();
+ 
+ 	xlrec.node = *rnode;
+ 	xlrec.blkno = blkno;
+ 
+ 	rdata[0].data = (char *) &xlrec;
+ 	rdata[0].len = SizeOfHeapNewpage;
+ 	rdata[0].buffer = InvalidBuffer;
+ 	rdata[0].next = &(rdata[1]);
+ 
+ 	rdata[1].data = (char *) page;
+ 	rdata[1].len = BLCKSZ;
+ 	rdata[1].buffer = InvalidBuffer;
+ 	rdata[1].next = NULL;
+ 
+ 	recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_NEWPAGE, rdata);
+ 
+ 	PageSetLSN(page, recptr);
+ 	PageSetTLI(page, ThisTimeLineID);
+ 
+ 	END_CRIT_SECTION();
+ 
+ 	return recptr;
+ }
+ 
  static void
  heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record)
  {
Index: src/backend/access/nbtree/nbtsort.c
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/access/nbtree/nbtsort.c,v
retrieving revision 1.110
diff -c -r1.110 nbtsort.c
*** src/backend/access/nbtree/nbtsort.c	9 Jan 2007 02:14:10 -0000	1.110
--- src/backend/access/nbtree/nbtsort.c	20 Mar 2007 17:58:20 -0000
***************
*** 64,69 ****
--- 64,70 ----
  
  #include "postgres.h"
  
+ #include "access/heapam.h"
  #include "access/nbtree.h"
  #include "miscadmin.h"
  #include "storage/smgr.h"
***************
*** 265,296 ****
  	if (wstate->btws_use_wal)
  	{
  		/* We use the heap NEWPAGE record type for this */
! 		xl_heap_newpage xlrec;
! 		XLogRecPtr	recptr;
! 		XLogRecData rdata[2];
! 
! 		/* NO ELOG(ERROR) from here till newpage op is logged */
! 		START_CRIT_SECTION();
! 
! 		xlrec.node = wstate->index->rd_node;
! 		xlrec.blkno = blkno;
! 
! 		rdata[0].data = (char *) &xlrec;
! 		rdata[0].len = SizeOfHeapNewpage;
! 		rdata[0].buffer = InvalidBuffer;
! 		rdata[0].next = &(rdata[1]);
! 
! 		rdata[1].data = (char *) page;
! 		rdata[1].len = BLCKSZ;
! 		rdata[1].buffer = InvalidBuffer;
! 		rdata[1].next = NULL;
! 
! 		recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_NEWPAGE, rdata);
! 
! 		PageSetLSN(page, recptr);
! 		PageSetTLI(page, ThisTimeLineID);
! 
! 		END_CRIT_SECTION();
  	}
  	else
  	{
--- 266,272 ----
  	if (wstate->btws_use_wal)
  	{
  		/* We use the heap NEWPAGE record type for this */
! 		log_newpage(&wstate->index->rd_node, blkno, page);
  	}
  	else
  	{
Index: src/backend/commands/Makefile
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/commands/Makefile,v
retrieving revision 1.35
diff -c -r1.35 Makefile
*** src/backend/commands/Makefile	20 Jan 2007 17:16:11 -0000	1.35
--- src/backend/commands/Makefile	20 Mar 2007 17:04:58 -0000
***************
*** 16,22 ****
  	conversioncmds.o copy.o \
  	dbcommands.o define.o explain.o functioncmds.o \
  	indexcmds.o lockcmds.o operatorcmds.o opclasscmds.o \
! 	portalcmds.o prepare.o proclang.o \
  	schemacmds.o sequence.o tablecmds.o tablespace.o trigger.o \
  	typecmds.o user.o vacuum.o vacuumlazy.o variable.o view.o
  
--- 16,22 ----
  	conversioncmds.o copy.o \
  	dbcommands.o define.o explain.o functioncmds.o \
  	indexcmds.o lockcmds.o operatorcmds.o opclasscmds.o \
! 	rewriteheap.o portalcmds.o prepare.o proclang.o \
  	schemacmds.o sequence.o tablecmds.o tablespace.o trigger.o \
  	typecmds.o user.o vacuum.o vacuumlazy.o variable.o view.o
  
Index: src/backend/commands/cluster.c
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/commands/cluster.c,v
retrieving revision 1.158
diff -c -r1.158 cluster.c
*** src/backend/commands/cluster.c	29 Mar 2007 00:15:37 -0000	1.158
--- src/backend/commands/cluster.c	29 Mar 2007 08:28:57 -0000
***************
*** 20,25 ****
--- 20,26 ----
  #include "access/genam.h"
  #include "access/heapam.h"
  #include "access/xact.h"
+ #include "access/transam.h"
  #include "catalog/catalog.h"
  #include "catalog/dependency.h"
  #include "catalog/heap.h"
***************
*** 36,41 ****
--- 37,44 ----
  #include "utils/memutils.h"
  #include "utils/syscache.h"
  #include "utils/relcache.h"
+ #include "storage/procarray.h"
+ #include "commands/rewriteheap.h"
  
  
  /*
***************
*** 653,659 ****
  	char	   *nulls;
  	IndexScanDesc scan;
  	HeapTuple	tuple;
! 	CommandId	mycid = GetCurrentCommandId();
  	bool		use_wal;
  
  	/*
--- 656,663 ----
  	char	   *nulls;
  	IndexScanDesc scan;
  	HeapTuple	tuple;
! 	TransactionId OldestXmin;
! 	RewriteState rwstate;
  	bool		use_wal;
  
  	/*
***************
*** 677,682 ****
--- 681,689 ----
  	nulls = (char *) palloc(natts * sizeof(char));
  	memset(nulls, 'n', natts * sizeof(char));
  
+ 	/* Get an OldestXmin to use to weed out dead tuples */
+ 	OldestXmin = GetOldestXmin(OldHeap->rd_rel->relisshared, false);
+ 
  	/*
  	 * We need to log the copied data in WAL iff WAL archiving is enabled AND
  	 * it's not a temp rel.  (Since we know the target relation is new and
***************
*** 688,724 ****
  	/* use_wal off requires rd_targblock be initially invalid */
  	Assert(NewHeap->rd_targblock == InvalidBlockNumber);
  
  	/*
  	 * Scan through the OldHeap on the OldIndex and copy each tuple into the
  	 * NewHeap.
  	 */
  	scan = index_beginscan(OldHeap, OldIndex,
! 						   SnapshotNow, 0, (ScanKey) NULL);
  
  	while ((tuple = index_getnext(scan, ForwardScanDirection)) != NULL)
  	{
  		/*
  		 * We cannot simply pass the tuple to heap_insert(), for several
  		 * reasons:
  		 *
! 		 * 1. heap_insert() will overwrite the commit-status fields of the
! 		 * tuple it's handed.  This would trash the source relation, which is
! 		 * bad news if we abort later on.  (This was a bug in releases thru
! 		 * 7.0)
! 		 *
! 		 * 2. We'd like to squeeze out the values of any dropped columns, both
  		 * to save space and to ensure we have no corner-case failures. (It's
  		 * possible for example that the new table hasn't got a TOAST table
  		 * and so is unable to store any large values of dropped cols.)
  		 *
! 		 * 3. The tuple might not even be legal for the new table; this is
  		 * currently only known to happen as an after-effect of ALTER TABLE
  		 * SET WITHOUT OIDS.
  		 *
  		 * So, we must reconstruct the tuple from component Datums.
  		 */
- 		HeapTuple	copiedTuple;
- 		int			i;
  
  		heap_deformtuple(tuple, oldTupDesc, values, nulls);
  
--- 695,743 ----
  	/* use_wal off requires rd_targblock be initially invalid */
  	Assert(NewHeap->rd_targblock == InvalidBlockNumber);
  
+ 	/* Initialize the rewrite operation */
+ 	rwstate = begin_heap_rewrite(NewHeap, OldestXmin, use_wal);
+ 
  	/*
  	 * Scan through the OldHeap on the OldIndex and copy each tuple into the
  	 * NewHeap.
  	 */
  	scan = index_beginscan(OldHeap, OldIndex,
! 						   SnapshotAny, 0, (ScanKey) NULL);
  
  	while ((tuple = index_getnext(scan, ForwardScanDirection)) != NULL)
  	{
+ 		HeapTuple	copiedTuple;
+ 		int			i;
+ 		bool		isdead;
+ 
+ 		LockBuffer(scan->xs_cbuf, BUFFER_LOCK_SHARE);
+ 		isdead = HeapTupleSatisfiesVacuum(tuple->t_data, 
+ 										  OldestXmin, 
+ 										  scan->xs_cbuf) == HEAPTUPLE_DEAD;
+ 		LockBuffer(scan->xs_cbuf, BUFFER_LOCK_UNLOCK);
+ 
+ 		if(isdead)
+ 		{
+ 			rewrite_heap_dead_tuple(rwstate, tuple);
+ 			continue;
+ 		}
+ 
  		/*
  		 * We cannot simply pass the tuple to heap_insert(), for several
  		 * reasons:
  		 *
! 		 * 1. We'd like to squeeze out the values of any dropped columns, both
  		 * to save space and to ensure we have no corner-case failures. (It's
  		 * possible for example that the new table hasn't got a TOAST table
  		 * and so is unable to store any large values of dropped cols.)
  		 *
! 		 * 2. The tuple might not even be legal for the new table; this is
  		 * currently only known to happen as an after-effect of ALTER TABLE
  		 * SET WITHOUT OIDS.
  		 *
  		 * So, we must reconstruct the tuple from component Datums.
  		 */
  
  		heap_deformtuple(tuple, oldTupDesc, values, nulls);
  
***************
*** 735,747 ****
  		if (NewHeap->rd_rel->relhasoids)
  			HeapTupleSetOid(copiedTuple, HeapTupleGetOid(tuple));
  
! 		heap_insert(NewHeap, copiedTuple, mycid, use_wal, false);
  
  		heap_freetuple(copiedTuple);
  
  		CHECK_FOR_INTERRUPTS();
  	}
  
  	index_endscan(scan);
  
  	pfree(values);
--- 754,768 ----
  		if (NewHeap->rd_rel->relhasoids)
  			HeapTupleSetOid(copiedTuple, HeapTupleGetOid(tuple));
  
! 		rewrite_heap_tuple(rwstate, tuple, copiedTuple);
  
  		heap_freetuple(copiedTuple);
  
  		CHECK_FOR_INTERRUPTS();
  	}
  
+ 	end_heap_rewrite(rwstate);
+ 
  	index_endscan(scan);
  
  	pfree(values);
Index: src/backend/commands/rewriteheap.c
===================================================================
RCS file: src/backend/commands/rewriteheap.c
diff -N src/backend/commands/rewriteheap.c
*** /dev/null	1 Jan 1970 00:00:00 -0000
--- src/backend/commands/rewriteheap.c	29 Mar 2007 08:31:00 -0000
***************
*** 0 ****
--- 1,543 ----
+ /*-------------------------------------------------------------------------
+  *
+  * rewriteheap.c
+  *	  support functions to rewrite tables.
+  *
+  * These functions provide a facility to completely rewrite a heap, while
+  * preserving visibility information and update chains. 
+  *
+  *
+  * INTERFACE
+  *
+  * The caller is responsible for creating the new heap, all catalog 
+  * changes, supplying the tuples to be written to the new heap, and 
+  * rebuilding indexes. The caller must hold an AccessExclusiveLock on the 
+  * table.
+  *
+  * To use the facility:
+  *
+  * begin_heap_rewrite
+  * while(fetch next tuple)
+  * {
+  *     if(tuple is dead)
+  *         rewrite_heap_dead_tuple
+  *     else
+  *     {
+  *         do any transformations here if required
+  *         rewrite_heap_tuple
+  *     }
+  * }
+  * end_heap_rewrite
+  *
+  * The rewrite facility holds the current page it's inserting into pinned
+  * and locked over rewrite_heap_tuple calls. To avoid deadlock, the caller
+  * must not access the new relation while a rewrite is in progress.
+  *
+  * The contents of the new relation shouldn't be relied on until 
+  * end_heap_rewrite is called.
+  *
+  *
+  * IMPLEMENTATION
+  *
+  * While we copy the tuples, we need to keep track of ctid-references to
+  * handle update chains correctly. Since the new versions of tuples will 
+  * have different ctids than the original ones, if we just copied 
+  * everything to the new table the ctid pointers would be wrong. For each 
+  * ctid reference from A -> B, we can encounter either A first or B first:
+  *
+  * If we encounter A first, we'll store the tuple in the unresolved_ctids
+  * hash table. When we later encounter B, we remove A from the hash table,
+  * fix the ctid to point to the new location of B, and insert both A and B
+  * to the new heap.
+  *
+  * If we encounter B first, we'll put an entry in the old_new_tid_map hash
+  * table and insert B to the new heap right away. When we later encounter 
+  * A, we get the new location of B from the table.
+  *
+  * Note that a tuple in the middle of a chain is both A and B at the same
+  * time. Entries in the hash tables can be removed as soon as the later
+  * tuple is encountered. That helps to keep the memory usage down. At the 
+  * end, both tables are usually empty; we should have encountered both A 
+  * and B for each ctid reference. However, if there's a chain like this:
+  * RECENTLY_DEAD -> DEAD -> LIVE as determined by HeapTupleSatisfiesVacuum,
+  * we can encounter the dead tuple first, and skip it, and bump into the
+  * RECENTLY_DEAD tuple later. The RECENTLY_DEAD tuple would be added
+  * to unresolved_ctids, and stay there until end of the rewrite.
+  *
+  * Using in-memory hash tables means that we use some memory for each live
+  * update chain in the table, from the time we find one end of the
+  * reference until we find the other end. That shouldn't be a problem in
+  * practice, but if you do something like an UPDATE without a where-clause
+  * on a large table, and then run CLUSTER in the same transaction, you
+  * might run out of memory. It doesn't seem worthwhile to add support to
+  * spill-to-disk, there shouldn't be that many RECENTLY_DEAD tuples in a
+  * table under normal circumstances, and even if we do fail halfway through
+  * a CLUSTER, the old table is still valid.
+  *
+  * We can't use the normal heap_insert function to insert into the new
+  * heap, because heap_insert overwrites the visibility information.
+  * We use a special-purpose raw_heap_insert function instead, which
+  * is optimized for bulk inserting a lot of tuples, knowing that we have
+  * exclusive access to the heap. raw_heap_insert keeps the current buffer
+  * we're inserting into locked over calls, until it gets full. The page 
+  * is inserted to WAL as a single record.
+  *
+  * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994-5, Regents of the University of California
+  *
+  * IDENTIFICATION
+  *	  $PostgreSQL$
+  *
+  *-------------------------------------------------------------------------
+  */
+ #include "postgres.h"
+ 
+ #include "access/heapam.h"
+ #include "access/transam.h"
+ #include "access/tuptoaster.h"
+ #include "commands/rewriteheap.h"
+ #include "utils/memutils.h"
+ 
+ /*
+  * State associated with a rewrite operation. This is opaque to the user
+  * of the rewrite facility.
+  */
+ typedef struct RewriteStateData
+ {
+ 	Relation		rs_new_rel;			/* new heap */
+ 	bool			rs_use_wal;			/* WAL-log inserts to new heap? */
+ 	MemoryContext	rs_cxt;				/* for hash tables and entries and 
+ 										 * tuples in them */
+ 	TransactionId	rs_oldest_xmin;		/* oldest xmin used by caller to
+ 										 * determine tuple visibility */
+ 	Buffer			rs_buf;				/* current buffer we're inserting
+ 										 * to, exclusively locked */
+ 	HTAB		   *rs_unresolved_ctids;
+ 	HTAB		   *rs_old_new_tid_map;
+ } RewriteStateData;
+ 
+ /* entry structures for the hash tables */
+ 
+ typedef struct UnresolvedCtidData {
+ 	ItemPointerData old_ctid;	/* t_ctid-pointer of the tuple in old heap */
+ 	ItemPointerData old_tid;	/* location in the old heap */
+ 	HeapTuple tuple;			/* tuple contents */
+ } UnresolvedCtidData;
+ 
+ typedef UnresolvedCtidData *UnresolvedCtid;
+ 
+ typedef struct OldToNewMappingData {
+ 	ItemPointerData old_tid;	/* location in the old heap */
+ 	ItemPointerData new_tid;	/* location in the new heap */
+ } OldToNewMappingData;
+ 
+ typedef OldToNewMappingData *OldToNewMapping;
+ 
+ 
+ /* prototypes for internal functions */
+ static void raw_heap_insert(RewriteState state, HeapTuple tup);
+ 
+ 
+ /*
+  * Begin a rewrite of a table
+  *
+  * new_heap		new, empty heap relation to insert tuples to
+  * oldest_xmin	xid used by the caller to determine which tuples
+  *				are dead.
+  * use_wal		should the inserts to the new heap be WAL-logged
+  *
+  * Returns an opaque RewriteState, allocated in current memory context,
+  * to be used in subseuqent calls to the other functions.
+  */
+ RewriteState
+ begin_heap_rewrite(Relation new_heap, TransactionId oldest_xmin,
+ 				   bool use_wal)
+ {
+ 	RewriteState state;
+ 	HASHCTL		hash_ctl;
+ 
+ 	state = palloc(sizeof(RewriteStateData));
+ 	state->rs_cxt = AllocSetContextCreate(CurrentMemoryContext,
+ 										  "Table rewrite",
+ 										  ALLOCSET_DEFAULT_MINSIZE,
+ 										  ALLOCSET_DEFAULT_INITSIZE,
+ 										  ALLOCSET_DEFAULT_MAXSIZE);
+ 
+ 	state->rs_oldest_xmin = oldest_xmin;
+ 	state->rs_new_rel = new_heap;
+ 	state->rs_use_wal = use_wal;
+ 	state->rs_buf = InvalidBuffer;
+ 
+ 	/* Initialize hash tables used to track update chains */
+ 	memset(&hash_ctl, 0, sizeof(hash_ctl));
+ 	hash_ctl.keysize = sizeof(ItemPointerData);
+ 	hash_ctl.entrysize = sizeof(UnresolvedCtidData);
+ 	hash_ctl.hcxt = state->rs_cxt;
+ 	hash_ctl.hash = tag_hash;
+ 
+ 	state->rs_unresolved_ctids =
+ 		hash_create("Rewrite / Unresolved ctids",
+ 					128, /* arbitrary initial size */
+ 					&hash_ctl,
+ 					HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
+ 
+ 	hash_ctl.entrysize = sizeof(OldToNewMappingData);
+ 	
+ 	state->rs_old_new_tid_map =
+ 		hash_create("Rewrite / Old to new tid map",
+ 					128, /* arbitrary initial size */
+ 					&hash_ctl,
+ 					HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
+ 
+ 	return state;
+ }
+ 
+ /*
+  * End a rewrite.
+  *
+  * state and any other resources are freed.
+  */
+ void
+ end_heap_rewrite(RewriteState state)
+ {
+ 	HASH_SEQ_STATUS seq_status;
+ 	UnresolvedCtid unresolved;
+ 
+ 	/* Write any remaining tuples in the unresolvedCtids table. If we have
+ 	 * any left, they should in fact be dead, but let's err on the safe 
+ 	 * side.
+ 	 */
+ 	hash_seq_init(&seq_status, state->rs_unresolved_ctids);
+ 
+ 	while((unresolved = hash_seq_search(&seq_status)) != NULL)
+ 	{
+ 		ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid);
+ 		raw_heap_insert(state, unresolved->tuple);
+ 
+ 		/* don't bother removing and pfreeing the entry, we're going
+ 		 * to free the whole memory context after the scan anyway
+ 		 */
+ 	}
+ 	
+ 	/* WAL-log and unlock the last page */
+ 	if(BufferIsValid(state->rs_buf))
+ 	{
+ 		if(state->rs_use_wal)
+ 			log_newpage(&state->rs_new_rel->rd_node,
+ 						BufferGetBlockNumber(state->rs_buf),
+ 						BufferGetPage(state->rs_buf));
+ 
+ 		MarkBufferDirty(state->rs_buf);
+ 
+ 		UnlockReleaseBuffer(state->rs_buf);
+ 		state->rs_buf = InvalidBuffer;
+ 	}
+ 
+ 	MemoryContextDelete(state->rs_cxt);
+ 	pfree(state);
+ }
+ 
+ /*
+  * Register a dead tuple with an ongoing rewrite. Dead tuples are not
+  * copied to the new table, but we still make note of them so that we
+  * can release some resources earlier.
+  */
+ void
+ rewrite_heap_dead_tuple(RewriteState state, HeapTuple old_tuple)
+ {
+ 	/* If we have already seen an earlier tuple in the update chain that
+ 	 * points to this tuple, let's forget about that earlier tuple. It's
+ 	 * in fact dead as well, our simple xmax < OldestXmin test in 
+ 	 * HeapTupleSatisfiesVacuum just wasn't enough to detect it. It
+ 	 * happens when xmin of a tuple is greater than xmax, which sounds
+ 	 * counter-intuitive but is perfectly valid.
+ 	 *
+ 	 * We don't bother to try to detect the situation the other way
+ 	 * round, when we encounter the dead tuple first and then the
+ 	 * recently dead one that points to it. If that happens, we'll
+ 	 * have some unmatched entries in the unresolvedCtids hash table
+ 	 * at the end. That can happen anyway, because a vacuum might
+ 	 * have removed the dead tuple in the chain before us.
+ 	 */
+ 	hash_search(state->rs_unresolved_ctids,
+ 				(void *) &(old_tuple->t_self),
+ 				HASH_REMOVE, NULL);
+ }
+ 
+ /*
+  * Add a tuple to the new heap.
+  *
+  * Visibility information is copied from the original tuple.
+  *
+  * state		opaque state as returned by begin_heap_rewrite
+  * old_tuple	original tuple in the old heap
+  * new_tuple	new, rewritten tuple to be inserted to new heap
+  */
+ void
+ rewrite_heap_tuple(RewriteState state,
+ 				   HeapTuple old_tuple, HeapTuple new_tuple)
+ {
+ 	MemoryContext saved_cxt;
+ 	ItemPointerData old_tid;
+ 
+ 	saved_cxt = MemoryContextSwitchTo(state->rs_cxt);
+ 
+ 	/* Preserve the visibility information. Also preserve the HEAP_UPDATED
+ 	 * flag; if the original tuple was an updated row version, so is the 
+ 	 * new one.
+ 	 */
+ 	memcpy(&new_tuple->t_data->t_choice.t_heap,
+ 		   &old_tuple->t_data->t_choice.t_heap,
+ 		   sizeof(HeapTupleFields));
+ 
+ 	new_tuple->t_data->t_infomask &= ~(HEAP_XACT_MASK | HEAP_UPDATED);
+ 	new_tuple->t_data->t_infomask |=
+ 		old_tuple->t_data->t_infomask & (HEAP_XACT_MASK | HEAP_UPDATED);
+ 
+ 	/* Invalid ctid means that ctid should point to the tuple itself.
+ 	 * We'll override it below if the tuple is part of an update chain.
+ 	 */
+ 	ItemPointerSetInvalid(&new_tuple->t_data->t_ctid);
+ 
+ 	/* When we encounter a tuple that has been updated, check
+ 	 * the old-to-new mapping hash table. If a match is found there,
+ 	 * we've already copied the tuple that t_ctid points to, so we
+ 	 * set the ctid of this tuple point to the new location, and insert
+ 	 * the new tuple. Otherwise we store the tuple in the unresolved_ctids
+ 	 * hash table and deal with it later, after inserting the tuple t_ctid
+ 	 * points to.
+ 	 */
+ 	if (!(old_tuple->t_data->t_infomask & (HEAP_XMAX_INVALID |
+ 									   HEAP_IS_LOCKED)) &&
+ 		!(ItemPointerEquals(&(old_tuple->t_self),
+ 							&(old_tuple->t_data->t_ctid))))
+ 	{
+ 		OldToNewMapping mapping;
+ 
+ 		mapping = (OldToNewMapping)
+ 			hash_search(state->rs_old_new_tid_map,
+ 						(void *) &(old_tuple->t_data->t_ctid),
+ 						HASH_FIND, NULL);
+ 
+ 		if (mapping != NULL)
+ 		{
+ 			/* We've already copied the tuple t_ctid points to.
+ 			 * Use the new tid of that tuple */
+ 			new_tuple->t_data->t_ctid = mapping->new_tid;
+ 
+ 			hash_search(state->rs_old_new_tid_map,
+ 						(void *) &(old_tuple->t_data->t_ctid),
+ 						HASH_REMOVE, NULL);
+ 		}
+ 		else
+ 		{
+ 			/* We haven't seen the tuple t_ctid points to yet. Stash
+ 			 * it into unresolved_ctids table and deal with it later.
+ 			 */
+ 			UnresolvedCtid unresolved;
+ 			bool found;
+ 
+ 			unresolved = hash_search(state->rs_unresolved_ctids,
+ 									 (void *) &(old_tuple->t_data->t_ctid),
+ 									 HASH_ENTER, &found);
+ 			Assert(!found);
+ 
+ 			unresolved->old_tid = old_tuple->t_self;
+ 			unresolved->tuple = heap_copytuple(new_tuple);
+ 
+ 			MemoryContextSwitchTo(saved_cxt);
+ 			return;
+ 		}
+ 	}
+ 
+ 	old_tid = old_tuple->t_self;
+ 
+ 	for(;;)
+ 	{
+ 		ItemPointerData new_tid;
+ 
+ 		/* Insert the tuple */
+ 		raw_heap_insert(state, new_tuple);
+ 		new_tid = new_tuple->t_self;
+ 
+ 		/*
+ 		 * When we encounter a tuple that's an updated version
+ 		 * of a row, check if the previous tuple in the update chain
+ 		 * is still visible to someone. The previous tuple's xmax
+ 		 * is equal to this tuples xmin, so we can do the same check
+ 		 * HeapTupleSatisfiesVacuum would do for the previous tuple,
+ 		 * using xmin of this tuple. We know that xmin of this tuple
+ 		 * committed, otherwise this tuple would be dead and the caller
+ 		 * wouldn't have passed it to us in the first place.
+ 		 */
+ 		if ((new_tuple->t_data->t_infomask & HEAP_UPDATED) &&
+ 			!TransactionIdPrecedes(HeapTupleHeaderGetXmin(new_tuple->t_data),
+ 								   state->rs_oldest_xmin))
+ 		{
+ 			/* The previous tuple in the update chain is RECENTLY_DEAD.
+ 			 * Let's see if we've seen it already. */
+ 			UnresolvedCtid unresolved;
+ 
+ 			unresolved = hash_search(state->rs_unresolved_ctids,
+ 									 (void *) &old_tid,
+ 									 HASH_FIND, NULL);
+ 
+ 			if (unresolved != NULL)
+ 			{
+ 				/* We have seen and memorized the previous tuple already. 
+ 				 * Now that we know where we inserted the tuple its t_ctid 
+ 				 * points to, fix its t_ctid and insert it to the new heap.
+ 				 */
+ 				HeapTuple		prev_tup	 = unresolved->tuple;
+ 				ItemPointerData prev_old_tid = unresolved->old_tid;
+ 
+ 				prev_tup->t_data->t_ctid = new_tid;
+ 
+ 				hash_search(state->rs_unresolved_ctids,
+ 							(void *) &old_tid,
+ 							HASH_REMOVE, NULL);
+ 
+ 				/* loop back to insert the previous tuple in the chain */
+ 				old_tid = prev_old_tid;
+ 				new_tuple = prev_tup;
+ 				continue;
+ 			}
+ 			else
+ 			{
+ 				/* Remember the new tid of this tuple. We'll use it to set
+ 				 * the ctid when we find the previous tuple in the chain
+ 				 */
+ 				OldToNewMapping mapping;
+ 				bool found;
+ 
+ 				mapping = hash_search(state->rs_old_new_tid_map,
+ 									  (void *) &old_tid,
+ 									  HASH_ENTER, &found);
+ 				Assert(!found);
+ 
+ 				mapping->new_tid = new_tid;
+ 			}
+ 		}
+ 		break;
+ 	}
+ 
+ 	MemoryContextSwitchTo(saved_cxt);
+ }
+ 
+ /*
+  * Insert a tuple to the new relation.
+  *
+  * t_self of the tuple is set to the new TID of the tuple. If t_ctid of the
+  * tuple is invalid on entry, it's replaced with the new TID as well.
+  */
+ static void
+ raw_heap_insert(RewriteState state, HeapTuple new_tup)
+ {
+ 	BlockNumber		blkno	= InvalidBlockNumber;
+ 	Page			page	= NULL;
+ 	Size			pageFreeSpace, saveFreeSpace;
+ 	Size			len;
+ 	OffsetNumber	newoff;
+ 	HeapTuple		tup;
+ 
+ 	/*
+ 	 * If the new tuple is too big for storage or contains already toasted
+ 	 * out-of-line attributes from some other relation, invoke the toaster.
+ 	 *
+ 	 * Note: below this point, tup is the data we actually intend to store
+ 	 * into the relation; new_tup is the caller's original untoasted data.
+ 	 */
+ 	if (HeapTupleHasExternal(new_tup) || new_tup->t_len > TOAST_TUPLE_THRESHOLD)
+ 		tup = toast_insert_or_update(state->rs_new_rel, new_tup, NULL,
+ 									 state->rs_use_wal, false);
+ 	else
+ 		tup = new_tup;
+ 
+ 	len = MAXALIGN(tup->t_len);		/* be conservative */
+ 
+ 	/*
+ 	 * If we're gonna fail for oversize tuple, do it right away
+ 	 */
+ 	if (len > MaxHeapTupleSize)
+ 		ereport(ERROR,
+ 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+ 				 errmsg("row is too big: size %lu, maximum size %lu",
+ 						(unsigned long) len,
+ 						(unsigned long) MaxHeapTupleSize)));
+ 
+ 	/* Compute desired extra freespace due to fillfactor option */
+ 	saveFreeSpace = RelationGetTargetPageFreeSpace(state->rs_new_rel,
+ 												   HEAP_DEFAULT_FILLFACTOR);
+ 
+ 	/* Now we can check to see if there's enough free space here. */
+ 	if (BufferIsValid(state->rs_buf))
+ 	{
+ 		blkno = BufferGetBlockNumber(state->rs_buf);
+ 		page = (Page) BufferGetPage(state->rs_buf);
+ 		pageFreeSpace = PageGetFreeSpace(page);
+ 	
+ 		if (len + saveFreeSpace > pageFreeSpace)
+ 		{
+ 			if(state->rs_use_wal)
+ 				log_newpage(&state->rs_new_rel->rd_node, blkno, page);
+ 			
+ 			MarkBufferDirty(state->rs_buf);
+ 
+ 			UnlockReleaseBuffer(state->rs_buf);
+ 			state->rs_buf = InvalidBuffer;
+ 		}
+ 	}
+ 
+ 	if (!BufferIsValid(state->rs_buf))
+ 	{
+ 		/* There's no current buffer to insert to. Extend relation to get
+ 		 * a new page.
+ 		 */
+ 		state->rs_buf = ReadBuffer(state->rs_new_rel, P_NEW);
+ 		blkno = BufferGetBlockNumber(state->rs_buf);
+ 		page = (Page) BufferGetPage(state->rs_buf);
+ 
+ 		LockBuffer(state->rs_buf, BUFFER_LOCK_EXCLUSIVE);
+ 
+ 		/*
+ 		 * We need to initialize the empty new page.  Double-check that it 
+ 		 * really is empty (this should never happen, but if it does we
+ 		 * don't want to risk wiping out valid data).
+ 		 */
+ 
+ 		if (!PageIsNew((PageHeader) page))
+ 			elog(ERROR, "page %u of relation \"%s\" should be empty but is not",
+ 				 BufferGetBlockNumber(state->rs_buf),
+ 				 RelationGetRelationName(state->rs_new_rel));
+ 
+ 		PageInit(page, BufferGetPageSize(state->rs_buf), 0);
+ 	}
+ 
+ 	/* We now have a page pinned and locked to insert the new tuple to */
+ 
+ 	newoff = PageAddItem(page, (Item) tup->t_data, len, 
+ 						 InvalidOffsetNumber, LP_USED);
+ 	if (newoff == InvalidOffsetNumber)
+ 		elog(ERROR, "failed to add tuple");
+ 
+ 	/* Update t_self to the actual position where it was stored */
+ 	ItemPointerSet(&(new_tup->t_self), blkno, newoff);
+ 
+ 	/* Insert the correct position into CTID of the stored tuple, too,
+ 	 * if the caller didn't supply a valid CTID.
+ 	 */
+ 	if(!ItemPointerIsValid(&new_tup->t_data->t_ctid))
+ 	{
+ 		ItemId			newitemid;
+ 		HeapTupleHeader onpage_tup;
+ 
+ 		newitemid = PageGetItemId(page, newoff);
+ 		onpage_tup = (HeapTupleHeader) PageGetItem(page, newitemid);
+ 
+ 		onpage_tup->t_ctid = new_tup->t_self;
+ 	}
+ 
+ 	/* If tup is a private copy, release it. */
+ 	if (tup != new_tup)
+ 		heap_freetuple(tup);
+ }
Index: src/backend/commands/tablecmds.c
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/commands/tablecmds.c,v
retrieving revision 1.218
diff -c -r1.218 tablecmds.c
*** src/backend/commands/tablecmds.c	19 Mar 2007 23:38:29 -0000	1.218
--- src/backend/commands/tablecmds.c	20 Mar 2007 18:59:08 -0000
***************
*** 5855,5890 ****
  	{
  		smgrread(src, blkno, buf);
  
- 		/* XLOG stuff */
  		if (use_wal)
! 		{
! 			xl_heap_newpage xlrec;
! 			XLogRecPtr	recptr;
! 			XLogRecData rdata[2];
! 
! 			/* NO ELOG(ERROR) from here till newpage op is logged */
! 			START_CRIT_SECTION();
! 
! 			xlrec.node = dst->smgr_rnode;
! 			xlrec.blkno = blkno;
! 
! 			rdata[0].data = (char *) &xlrec;
! 			rdata[0].len = SizeOfHeapNewpage;
! 			rdata[0].buffer = InvalidBuffer;
! 			rdata[0].next = &(rdata[1]);
! 
! 			rdata[1].data = (char *) page;
! 			rdata[1].len = BLCKSZ;
! 			rdata[1].buffer = InvalidBuffer;
! 			rdata[1].next = NULL;
! 
! 			recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_NEWPAGE, rdata);
! 
! 			PageSetLSN(page, recptr);
! 			PageSetTLI(page, ThisTimeLineID);
! 
! 			END_CRIT_SECTION();
! 		}
  
  		/*
  		 * Now write the page.	We say isTemp = true even if it's not a temp
--- 5855,5862 ----
  	{
  		smgrread(src, blkno, buf);
  
  		if (use_wal)
! 			log_newpage(&dst->smgr_rnode, blkno, page);
  
  		/*
  		 * Now write the page.	We say isTemp = true even if it's not a temp
Index: src/include/access/heapam.h
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/include/access/heapam.h,v
retrieving revision 1.121
diff -c -r1.121 heapam.h
*** src/include/access/heapam.h	29 Mar 2007 00:15:39 -0000	1.121
--- src/include/access/heapam.h	29 Mar 2007 08:27:34 -0000
***************
*** 194,199 ****
--- 194,200 ----
  extern XLogRecPtr log_heap_freeze(Relation reln, Buffer buffer,
  								  TransactionId cutoff_xid,
  								  OffsetNumber *offsets, int offcnt);
+ extern XLogRecPtr log_newpage(RelFileNode *rnode, BlockNumber blk, Page page);
  
  /* in common/heaptuple.c */
  extern Size heap_compute_data_size(TupleDesc tupleDesc,
Index: src/include/commands/rewriteheap.h
===================================================================
RCS file: src/include/commands/rewriteheap.h
diff -N src/include/commands/rewriteheap.h
*** /dev/null	1 Jan 1970 00:00:00 -0000
--- src/include/commands/rewriteheap.h	20 Mar 2007 17:52:15 -0000
***************
*** 0 ****
--- 1,29 ----
+ /*-------------------------------------------------------------------------
+  *
+  * rewriteheap.h
+  *	  header file for heap rewrite support functions
+  *
+  * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994-5, Regents of the University of California
+  *
+  * $PostgreSQL$
+  *
+  *-------------------------------------------------------------------------
+  */
+ #ifndef REWRITE_HEAP_H
+ #define REWRITE_HEAP_H
+ 
+ #include "access/htup.h"
+ #include "utils/rel.h"
+ 
+ /* definition is private to rewriteheap.c */
+ typedef struct RewriteStateData *RewriteState;
+ 
+ extern RewriteState begin_heap_rewrite(Relation NewHeap, 
+ 									   TransactionId OldestXmin, bool use_wal);
+ extern void end_heap_rewrite(RewriteState state);
+ extern void rewrite_heap_dead_tuple(RewriteState state, HeapTuple oldTuple);
+ extern void rewrite_heap_tuple(RewriteState state, HeapTuple oldTuple, 
+ 							   HeapTuple newTuple);
+ 
+ #endif   /* REWRITE_H */
Index: src/test/regress/expected/cluster.out
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/test/regress/expected/cluster.out,v
retrieving revision 1.17
diff -c -r1.17 cluster.out
*** src/test/regress/expected/cluster.out	7 Jul 2005 20:40:01 -0000	1.17
--- src/test/regress/expected/cluster.out	20 Mar 2007 16:50:04 -0000
***************
*** 382,389 ****
--- 382,428 ----
   2
  (2 rows)
  
+ -- Test MVCC-safety of cluster. There isn't much we can do to verify the
+ -- results with a single backend...
+ CREATE TABLE clustertest (key int PRIMARY KEY);
+ NOTICE:  CREATE TABLE / PRIMARY KEY will create implicit index "clustertest_pkey" for table "clustertest"
+ INSERT INTO clustertest VALUES (10);
+ INSERT INTO clustertest VALUES (20);
+ INSERT INTO clustertest VALUES (30);
+ INSERT INTO clustertest VALUES (40);
+ INSERT INTO clustertest VALUES (50);
+ -- Test update where the old row version is found first in the scan
+ UPDATE clustertest SET key = 100 WHERE key = 10;
+ -- Test update where the new row version is found first in the scan
+ UPDATE clustertest SET key = 35 WHERE key = 40;
+ -- Test longer update chain 
+ UPDATE clustertest SET key = 60 WHERE key = 50;
+ UPDATE clustertest SET key = 70 WHERE key = 60;
+ UPDATE clustertest SET key = 80 WHERE key = 90;
+ SELECT * FROM clustertest;
+  key 
+ -----
+   20
+   30
+  100
+   35
+   70
+ (5 rows)
+ 
+ CLUSTER clustertest_pkey ON clustertest;
+ SELECT * FROM clustertest;
+  key 
+ -----
+   20
+   30
+   35
+   70
+  100
+ (5 rows)
+ 
  -- clean up
  \c -
+ DROP TABLE clustertest;
  DROP TABLE clstr_1;
  DROP TABLE clstr_2;
  DROP TABLE clstr_3;
Index: src/test/regress/sql/cluster.sql
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/test/regress/sql/cluster.sql,v
retrieving revision 1.9
diff -c -r1.9 cluster.sql
*** src/test/regress/sql/cluster.sql	7 Jul 2005 20:40:02 -0000	1.9
--- src/test/regress/sql/cluster.sql	20 Mar 2007 17:52:48 -0000
***************
*** 153,160 ****
--- 153,187 ----
  CLUSTER clstr_1;
  SELECT * FROM clstr_1;
  
+ -- Test MVCC-safety of cluster. There isn't much we can do to verify the
+ -- results with a single backend...
+ 
+ CREATE TABLE clustertest (key int PRIMARY KEY);
+ 
+ INSERT INTO clustertest VALUES (10);
+ INSERT INTO clustertest VALUES (20);
+ INSERT INTO clustertest VALUES (30);
+ INSERT INTO clustertest VALUES (40);
+ INSERT INTO clustertest VALUES (50);
+ 
+ -- Test update where the old row version is found first in the scan
+ UPDATE clustertest SET key = 100 WHERE key = 10;
+ 
+ -- Test update where the new row version is found first in the scan
+ UPDATE clustertest SET key = 35 WHERE key = 40;
+ 
+ -- Test longer update chain 
+ UPDATE clustertest SET key = 60 WHERE key = 50;
+ UPDATE clustertest SET key = 70 WHERE key = 60;
+ UPDATE clustertest SET key = 80 WHERE key = 90;
+ 
+ SELECT * FROM clustertest;
+ CLUSTER clustertest_pkey ON clustertest;
+ SELECT * FROM clustertest;
+ 
  -- clean up
  \c -
+ DROP TABLE clustertest;
  DROP TABLE clstr_1;
  DROP TABLE clstr_2;
  DROP TABLE clstr_3;
---------------------------(end of broadcast)---------------------------
TIP 6: explain analyze is your friend

Reply via email to