Hi All,

Here is a revised patch based on our earlier discussion. I implemented
Robert's idea of tracking the vacuum generation number in the line
pointer itself. For LP_DEAD line pointers, the lp_off/lp_len is unused
(and always set to 0 for heap tuples). We use those 30 bits to store
the generation number of the vacuum which would have potentially
removed the corresponding index pointers, if the vacuum finished
successfully. The pg_class information is used to know the status of
the vacuum, whether it failed or succeeded. 30-bit numbers are large
enough that we can ignore any wrap-around related issues. With this
change, we don't need any additional header or special space in the
page which was one of the main objection to the previous version.

Other than this major change, I have added code commentary at relevant
places and also fixed the item.h comments to reflect the change. I
think the patch is ready for a serious review now.

Thanks,
Pavan

-- 
Pavan Deolasee
EnterpriseDB     http://www.enterprisedb.com
diff --git a/contrib/pageinspect/heapfuncs.c b/contrib/pageinspect/heapfuncs.c
index 20bca0d..6213631 100644
--- a/contrib/pageinspect/heapfuncs.c
+++ b/contrib/pageinspect/heapfuncs.c
@@ -155,6 +155,7 @@ heap_page_items(PG_FUNCTION_ARGS)
 		 * many other ways, but at least we won't crash.
 		 */
 		if (ItemIdHasStorage(id) &&
+			!ItemIdIsDead(id) &&
 			lp_len >= sizeof(HeapTupleHeader) &&
 			lp_offset == MAXALIGN(lp_offset) &&
 			lp_offset + lp_len <= raw_page_size)
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 06db65d..cf65c05 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -3984,7 +3984,8 @@ log_heap_clean(Relation reln, Buffer buffer,
 			   OffsetNumber *redirected, int nredirected,
 			   OffsetNumber *nowdead, int ndead,
 			   OffsetNumber *nowunused, int nunused,
-			   TransactionId latestRemovedXid)
+			   TransactionId latestRemovedXid,
+			   uint32 vacgen)
 {
 	xl_heap_clean xlrec;
 	uint8		info;
@@ -3999,6 +4000,7 @@ log_heap_clean(Relation reln, Buffer buffer,
 	xlrec.latestRemovedXid = latestRemovedXid;
 	xlrec.nredirected = nredirected;
 	xlrec.ndead = ndead;
+	xlrec.vacgen = vacgen;
 
 	rdata[0].data = (char *) &xlrec;
 	rdata[0].len = SizeOfHeapClean;
@@ -4300,6 +4302,7 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record)
 	int			ndead;
 	int			nunused;
 	Size		freespace;
+	uint32		vacgen;
 
 	/*
 	 * We're about to remove tuples. In Hot Standby mode, ensure that there's
@@ -4332,6 +4335,7 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record)
 
 	nredirected = xlrec->nredirected;
 	ndead = xlrec->ndead;
+	vacgen = xlrec->vacgen;
 	end = (OffsetNumber *) ((char *) xlrec + record->xl_len);
 	redirected = (OffsetNumber *) ((char *) xlrec + SizeOfHeapClean);
 	nowdead = redirected + (nredirected * 2);
@@ -4343,7 +4347,8 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record)
 	heap_page_prune_execute(buffer,
 							redirected, nredirected,
 							nowdead, ndead,
-							nowunused, nunused);
+							nowunused, nunused,
+							vacgen);
 
 	freespace = PageGetHeapFreeSpace(page);		/* needed to update FSM below */
 
diff --git a/src/backend/access/heap/pruneheap.c b/src/backend/access/heap/pruneheap.c
index 0cfa866..00ac676 100644
--- a/src/backend/access/heap/pruneheap.c
+++ b/src/backend/access/heap/pruneheap.c
@@ -31,9 +31,12 @@ typedef struct
 	TransactionId new_prune_xid;	/* new prune hint value for page */
 	TransactionId latestRemovedXid;		/* latest xid to be removed by this
 										 * prune */
+	int			already_dead;		/* number of already dead line pointers */
+
 	int			nredirected;	/* numbers of entries in arrays below */
 	int			ndead;
 	int			nunused;
+
 	/* arrays that accumulate indexes of items to be changed */
 	OffsetNumber redirected[MaxHeapTuplesPerPage * 2];
 	OffsetNumber nowdead[MaxHeapTuplesPerPage];
@@ -125,8 +128,8 @@ heap_page_prune_opt(Relation relation, Buffer buffer, TransactionId OldestXmin)
 			TransactionId ignore = InvalidTransactionId;		/* return value not
 																 * needed */
 
-			/* OK to prune */
-			(void) heap_page_prune(relation, buffer, OldestXmin, true, &ignore);
+			/* OK to prune - pass invalid vacuum generation number */
+			(void) heap_page_prune(relation, buffer, OldestXmin, true, &ignore, 0);
 		}
 
 		/* And release buffer lock */
@@ -153,13 +156,15 @@ heap_page_prune_opt(Relation relation, Buffer buffer, TransactionId OldestXmin)
  */
 int
 heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
-				bool report_stats, TransactionId *latestRemovedXid)
+				bool report_stats, TransactionId *latestRemovedXid,
+				uint32 current_vacgen)
 {
 	int			ndeleted = 0;
 	Page		page = BufferGetPage(buffer);
 	OffsetNumber offnum,
 				maxoff;
 	PruneState	prstate;
+	uint32		last_finished_vacgen = RelationGetLastVacGen(relation);
 
 	/*
 	 * Our strategy is to scan the page and make lists of items to change,
@@ -175,6 +180,7 @@ heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
 	prstate.new_prune_xid = InvalidTransactionId;
 	prstate.latestRemovedXid = InvalidTransactionId;
 	prstate.nredirected = prstate.ndead = prstate.nunused = 0;
+	prstate.already_dead = 0;
 	memset(prstate.marked, 0, sizeof(prstate.marked));
 
 	/* Scan the page */
@@ -191,8 +197,26 @@ heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
 
 		/* Nothing to do if slot is empty or already dead */
 		itemid = PageGetItemId(page, offnum);
-		if (!ItemIdIsUsed(itemid) || ItemIdIsDead(itemid))
+		if (!ItemIdIsUsed(itemid))
 			continue;
+		
+		/* 
+		 * If the slot is dead-vacuumed and we know that the index pointers
+		 * have already been vacuumed by the last index vacuum, just mark them
+		 * unused so that they are removed when we defrag the page
+		 */
+		if (ItemIdIsDeadVacuumed(itemid))
+		{
+			if (ItemIdGetVacGen(itemid) == last_finished_vacgen)
+				heap_prune_record_unused(&prstate, offnum);
+			continue;
+		}
+		else if (ItemIdIsDead(itemid))
+		{
+			heap_prune_record_dead(&prstate, offnum);
+			prstate.already_dead++;
+			continue;
+		}
 
 		/* Process this item or chain of items */
 		ndeleted += heap_prune_chain(relation, buffer, offnum,
@@ -213,7 +237,8 @@ heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
 		heap_page_prune_execute(buffer,
 								prstate.redirected, prstate.nredirected,
 								prstate.nowdead, prstate.ndead,
-								prstate.nowunused, prstate.nunused);
+								prstate.nowunused, prstate.nunused,
+								current_vacgen);
 
 		/*
 		 * Update the page's pd_prune_xid field to either zero, or the lowest
@@ -241,7 +266,8 @@ heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
 									prstate.redirected, prstate.nredirected,
 									prstate.nowdead, prstate.ndead,
 									prstate.nowunused, prstate.nunused,
-									prstate.latestRemovedXid);
+									prstate.latestRemovedXid,
+									current_vacgen);
 
 			PageSetLSN(BufferGetPage(buffer), recptr);
 			PageSetTLI(BufferGetPage(buffer), ThisTimeLineID);
@@ -273,9 +299,12 @@ heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
 	 * If requested, report the number of tuples reclaimed to pgstats. This is
 	 * ndeleted minus ndead, because we don't want to count a now-DEAD root
 	 * item as a deletion for this purpose.
+	 *
+	 * Adjust already_dead since they are counted as ndead and we really don't
+	 * want to include them here
 	 */
-	if (report_stats && ndeleted > prstate.ndead)
-		pgstat_update_heap_dead_tuples(relation, ndeleted - prstate.ndead);
+	if (report_stats && ndeleted > (prstate.ndead - prstate.already_dead)) 
+		pgstat_update_heap_dead_tuples(relation, ndeleted - (prstate.ndead - prstate.already_dead));
 
 	*latestRemovedXid = prstate.latestRemovedXid;
 
@@ -645,7 +674,8 @@ void
 heap_page_prune_execute(Buffer buffer,
 						OffsetNumber *redirected, int nredirected,
 						OffsetNumber *nowdead, int ndead,
-						OffsetNumber *nowunused, int nunused)
+						OffsetNumber *nowunused, int nunused,
+						uint32 vacgen)
 {
 	Page		page = (Page) BufferGetPage(buffer);
 	OffsetNumber *offnum;
@@ -669,7 +699,17 @@ heap_page_prune_execute(Buffer buffer,
 		OffsetNumber off = *offnum++;
 		ItemId		lp = PageGetItemId(page, off);
 
-		ItemIdSetDead(lp);
+		/*
+		 * If we are called from a vacuum (vacgen > 0), mark the line pointers
+		 * as dead-vacuumed and also store the current vacuum generation number
+		 * in the line pointer. OTOH if we are called from a normal HOT-prune
+		 * routine, mark the line pointers as DEAD since the index pointers to
+		 * them will not be removed just yet.
+		 */
+		if (vacgen)
+			ItemIdSetDeadVacuumed(lp, vacgen);
+		else
+			ItemIdSetDead(lp);
 	}
 
 	/* Update all now-unused line pointers */
diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c
index 7ec6581..3794060 100644
--- a/src/backend/catalog/heap.c
+++ b/src/backend/catalog/heap.c
@@ -792,6 +792,8 @@ InsertPgClassTuple(Relation pg_class_desc,
 	values[Anum_pg_class_relhastriggers - 1] = BoolGetDatum(rd_rel->relhastriggers);
 	values[Anum_pg_class_relhassubclass - 1] = BoolGetDatum(rd_rel->relhassubclass);
 	values[Anum_pg_class_relfrozenxid - 1] = TransactionIdGetDatum(rd_rel->relfrozenxid);
+	values[Anum_pg_class_relnextvacgen - 1] = Int32GetDatum(rd_rel->relnextvacgen);
+	values[Anum_pg_class_rellastvacgen - 1] = Int32GetDatum(rd_rel->rellastvacgen);
 	if (relacl != (Datum) 0)
 		values[Anum_pg_class_relacl - 1] = relacl;
 	else
@@ -886,6 +888,9 @@ AddNewRelationTuple(Relation pg_class_desc,
 		new_rel_reltup->relfrozenxid = InvalidTransactionId;
 	}
 
+	new_rel_reltup->relnextvacgen = 1;
+	new_rel_reltup->rellastvacgen = 0;
+
 	new_rel_reltup->relowner = relowner;
 	new_rel_reltup->reltype = new_type_oid;
 	new_rel_reltup->reloftype = reloftype;
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index a68da59..072c52f 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -1075,11 +1075,20 @@ acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows,
 			 * pointers should be counted as dead, because we need vacuum to
 			 * run to get rid of them.	Note that this rule agrees with the
 			 * way that heap_page_prune() counts things.
+			 *
+			 * XXX We don't count dead line pointers if know that they can be
+			 * removed by a HOT cleanup.
 			 */
 			if (!ItemIdIsNormal(itemid))
 			{
-				if (ItemIdIsDead(itemid))
-					deadrows += 1;
+				if (ItemIdIsDeadVacuumed(itemid))
+				{
+					if (ItemIdGetVacGen(itemid) != RelationGetLastVacGen(onerel))
+						deadrows += 1;
+				}
+				else if (ItemIdIsDead(itemid))
+					deadrows++;
+
 				continue;
 			}
 
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 889737e..1a87e35 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -643,6 +643,88 @@ vac_update_relstats(Relation relation,
 	heap_close(rd, RowExclusiveLock);
 }
 
+/*
+ * Grab the next vacuum generation number to be used to stamp the dead-vacuumed
+ * line pointers and also increment the generation number.
+ */
+uint32
+vac_update_nextvacgen(Relation relation)
+{
+	Oid			relid = RelationGetRelid(relation);
+	Relation	rd;
+	HeapTuple	ctup;
+	Form_pg_class pgcform;
+	uint32		nextvacgen;
+
+	rd = heap_open(RelationRelationId, RowExclusiveLock);
+
+	/* Fetch a copy of the tuple to scribble on */
+	ctup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relid));
+	if (!HeapTupleIsValid(ctup))
+		elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
+			 relid);
+	pgcform = (Form_pg_class) GETSTRUCT(ctup);
+
+	/* Remember the next vacuum generation number before incrementing it */
+	nextvacgen = pgcform->relnextvacgen;
+
+	/* 
+	 * Increment while taking care of wrap-around (without using zero)
+	 *
+	 * Note: We don't worry about the wrap-around issues here since it would
+	 * take a 1 Billion vacuums on the same relation for the vacuum generation
+	 * to wrap-around. That would take ages to happen and even if it happens,
+	 * the chances that we might have dead-vacuumed line pointers still
+	 * stamped with the old (failed) vacuum are infinitely small since some
+	 * other vacuum cycle would have taken care of them.
+	 */
+	pgcform->relnextvacgen = pgcform->relnextvacgen + 1;
+	if (pgcform->relnextvacgen == 0x80000000)
+		pgcform->relnextvacgen = 1;
+
+	heap_inplace_update(rd, ctup);
+
+	heap_close(rd, RowExclusiveLock);
+
+	/* 
+	 * Increase command counter since we want to see the updated row when we
+	 * again come back to set the rellastvacgen when the vacuum completes and
+	 * we don't to forget what we just did above
+	 */
+	CommandCounterIncrement();
+
+	return nextvacgen;
+}
+
+/*
+ * Update the generation number of the last successful index vacuum.
+ */
+void
+vac_update_lastvacgen(Relation relation, uint32 vacgen)
+{
+	Oid			relid = RelationGetRelid(relation);
+	Relation	rd;
+	HeapTuple	ctup;
+	Form_pg_class pgcform;
+
+	rd = heap_open(RelationRelationId, RowExclusiveLock);
+
+	/* Fetch a copy of the tuple to scribble on */
+	ctup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relid));
+	if (!HeapTupleIsValid(ctup))
+		elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
+			 relid);
+	pgcform = (Form_pg_class) GETSTRUCT(ctup);
+
+	/* Store the 30 LSB to match with what we store in the line pointers */
+	pgcform->rellastvacgen = (vacgen & 0x3fffffff);
+
+	heap_inplace_update(rd, ctup);
+
+	heap_close(rd, RowExclusiveLock);
+
+	CommandCounterIncrement();
+}
 
 /*
  *	vac_update_datfrozenxid() -- update pg_database.datfrozenxid for our DB
diff --git a/src/backend/commands/vacuumlazy.c b/src/backend/commands/vacuumlazy.c
index c5bf32e..64f8920 100644
--- a/src/backend/commands/vacuumlazy.c
+++ b/src/backend/commands/vacuumlazy.c
@@ -99,6 +99,7 @@ typedef struct LVRelStats
 	ItemPointer dead_tuples;	/* array of ItemPointerData */
 	int			num_index_scans;
 	TransactionId latestRemovedXid;
+	uint32		lastvacgen;
 } LVRelStats;
 
 
@@ -114,15 +115,12 @@ static BufferAccessStrategy vac_strategy;
 /* non-export function prototypes */
 static void lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 			   Relation *Irel, int nindexes, bool scan_all);
-static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats);
 static void lazy_vacuum_index(Relation indrel,
 				  IndexBulkDeleteResult **stats,
 				  LVRelStats *vacrelstats);
 static void lazy_cleanup_index(Relation indrel,
 				   IndexBulkDeleteResult *stats,
 				   LVRelStats *vacrelstats);
-static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
-				 int tupindex, LVRelStats *vacrelstats);
 static void lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats);
 static BlockNumber count_nondeletable_pages(Relation onerel,
 						 LVRelStats *vacrelstats);
@@ -204,6 +202,10 @@ lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt,
 	/* Vacuum the Free Space Map */
 	FreeSpaceMapVacuum(onerel);
 
+	/* Since vacuum ran to completion, remember the vacuum generation number */
+	if (vacrelstats->lastvacgen != 0)
+		vac_update_lastvacgen(onerel, vacrelstats->lastvacgen);
+
 	/*
 	 * Update statistics in pg_class.  But don't change relfrozenxid if we
 	 * skipped any pages.
@@ -286,6 +288,34 @@ vacuum_log_cleanup_info(Relation rel, LVRelStats *vacrelstats)
  *
  *		If there are no indexes then we just vacuum each dirty page as we
  *		process it, since there's no point in gathering many tuples.
+ *
+ *		Starting 9.2, we removed the second heap pass of vacuum and instead
+ *		leave the dead line pointers in the heap to be removed by the next
+ *		vacuum cycle or a HOT-prune operation. We can do this without much
+ *		performance penalty because almost all the dead space is reclaimed in
+ *		the first pass itself (except that which is taken by the dead line
+ *		pointers and there is no guarantee that will be freed by the second
+ *		pass anyways). But this gives us two lareg benefits:
+ *
+ *		1. We don't have to scan the heap again. Even though visibility map
+ *		lets us scan only the necessary pages, in many cases this would still
+ *		be a large part of the relation
+ *
+ *		2. We don't have to write the heap pages (and associated WAL) twice.
+ *		Since vacuum use ring-buffers for heap scan, this would actually mean
+ *		disk IO unless the relation is very small.
+ *
+ *		The way we do this is by tracking the last successful vacuum by its
+ *		generation number in the pg_class row. When a dead line pointer is
+ *		collected by a vacuum, we store the generation number of the vacuum in
+ *		the line pointer itself (lp_off/lp_len is not used for DEAD heap line
+ *		pointer and that gives us 30-bits of unused space to store the
+ *		information). Later on, either as part of the HOT-prune or the next
+ *		vacuum on the table, we check if the vacuum generation number stored in
+ *		a dead-vacuumed lined pointer is same as the last successful vacuum on
+ *		the table and remove those dead-vacuumed line pointers. We are sure at
+ *		that point that the index pointers to those dead-vacuumed line pointers
+ *		must have been already removed.
  */
 static void
 lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
@@ -307,6 +337,7 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 	Buffer		vmbuffer = InvalidBuffer;
 	BlockNumber next_not_all_visible_block;
 	bool		skipping_all_visible_blocks;
+	int			current_vacgen;
 
 	pg_rusage_init(&ru0);
 
@@ -319,6 +350,23 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 	empty_pages = vacuumed_pages = 0;
 	num_tuples = tups_vacuumed = nkeep = nunused = 0;
 
+	/*
+	 * Before starting the vacuum, grab the next vacuum generation number for
+	 * this relation. Whenever a block is scanned and dead line pointers are
+	 * collected, we store the vacuum generation number in the line pointer
+	 * offset (since lp_off is not useful for dead heap line pointers).
+	 *
+	 * We also update the relnextvacgen to guard against the case when this
+	 * vacuum aborts after scanning few pages. If we don't increment the
+	 * relnextvacgen now, the next vacuum may use the same generation number
+	 * and if it skips the pages scanned by this vacuum (though not possible
+	 * currently because the way visibility map is handled), we might get into
+	 * a situation where the index pointers of some dead-vacuumed line pointers
+	 * are not yet removed, but the vacuum generation number stored in those
+	 * line pointers is same as the last successful vacuum on the table.
+	 */
+	current_vacgen = vac_update_nextvacgen(onerel);
+
 	indstats = (IndexBulkDeleteResult **)
 		palloc0(nindexes * sizeof(IndexBulkDeleteResult *));
 
@@ -432,8 +480,6 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 				lazy_vacuum_index(Irel[i],
 								  &indstats[i],
 								  vacrelstats);
-			/* Remove tuples from heap */
-			lazy_vacuum_heap(onerel, vacrelstats);
 
 			/*
 			 * Forget the now-vacuumed tuples, and press on, but be careful
@@ -529,7 +575,8 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 		 * We count tuples removed by the pruning step as removed by VACUUM.
 		 */
 		tups_vacuumed += heap_page_prune(onerel, buf, OldestXmin, false,
-										 &vacrelstats->latestRemovedXid);
+										 &vacrelstats->latestRemovedXid,
+										 current_vacgen);
 
 		/*
 		 * Now scan the page to collect vacuumable items and check for tuples
@@ -713,24 +760,13 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 			}
 		}
 
+		vacuumed_pages++;
+
 		/*
-		 * If there are no indexes then we can vacuum the page right now
-		 * instead of doing a second scan.
+		 * If there are no indexes, we don't need to remember the dead tuples
 		 */
-		if (nindexes == 0 &&
-			vacrelstats->num_dead_tuples > 0)
-		{
-			/* Remove tuples from heap */
-			lazy_vacuum_page(onerel, blkno, buf, 0, vacrelstats);
-
-			/*
-			 * Forget the now-vacuumed tuples, and press on, but be careful
-			 * not to reset latestRemovedXid since we want that value to be
-			 * valid.
-			 */
+		if (nindexes == 0)
 			vacrelstats->num_dead_tuples = 0;
-			vacuumed_pages++;
-		}
 
 		freespace = PageGetHeapFreeSpace(page);
 
@@ -789,14 +825,9 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 			vacrelstats->nonempty_pages = blkno + 1;
 
 		/*
-		 * If we remembered any tuples for deletion, then the page will be
-		 * visited again by lazy_vacuum_heap, which will compute and record
-		 * its post-compaction free space.	If not, then we're done with this
-		 * page, so remember its free space as-is.	(This path will always be
-		 * taken if there are no indexes.)
+		 * Record the free space on the page.
 		 */
-		if (vacrelstats->num_dead_tuples == prev_dead_count)
-			RecordPageWithFreeSpace(onerel, blkno, freespace);
+		RecordPageWithFreeSpace(onerel, blkno, freespace);
 	}
 
 	/* save stats for use later */
@@ -821,8 +852,6 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 			lazy_vacuum_index(Irel[i],
 							  &indstats[i],
 							  vacrelstats);
-		/* Remove tuples from heap */
-		lazy_vacuum_heap(onerel, vacrelstats);
 		vacrelstats->num_index_scans++;
 	}
 
@@ -833,11 +862,14 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 		vmbuffer = InvalidBuffer;
 	}
 
+	/* Remember the current vacuum generation */
+	vacrelstats->lastvacgen = current_vacgen;
+
 	/* Do post-vacuum cleanup and statistics update for each index */
 	for (i = 0; i < nindexes; i++)
 		lazy_cleanup_index(Irel[i], indstats[i], vacrelstats);
 
-	/* If no indexes, make log report that lazy_vacuum_heap would've made */
+	/* Report vacuum stats */
 	if (vacuumed_pages)
 		ereport(elevel,
 				(errmsg("\"%s\": removed %.0f row versions in %u pages",
@@ -859,118 +891,6 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 					   pg_rusage_show(&ru0))));
 }
 
-
-/*
- *	lazy_vacuum_heap() -- second pass over the heap
- *
- *		This routine marks dead tuples as unused and compacts out free
- *		space on their pages.  Pages not having dead tuples recorded from
- *		lazy_scan_heap are not visited at all.
- *
- * Note: the reason for doing this as a second pass is we cannot remove
- * the tuples until we've removed their index entries, and we want to
- * process index entry removal in batches as large as possible.
- */
-static void
-lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
-{
-	int			tupindex;
-	int			npages;
-	PGRUsage	ru0;
-
-	pg_rusage_init(&ru0);
-	npages = 0;
-
-	tupindex = 0;
-	while (tupindex < vacrelstats->num_dead_tuples)
-	{
-		BlockNumber tblk;
-		Buffer		buf;
-		Page		page;
-		Size		freespace;
-
-		vacuum_delay_point();
-
-		tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]);
-		buf = ReadBufferExtended(onerel, MAIN_FORKNUM, tblk, RBM_NORMAL,
-								 vac_strategy);
-		LockBufferForCleanup(buf);
-		tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, vacrelstats);
-
-		/* Now that we've compacted the page, record its available space */
-		page = BufferGetPage(buf);
-		freespace = PageGetHeapFreeSpace(page);
-
-		UnlockReleaseBuffer(buf);
-		RecordPageWithFreeSpace(onerel, tblk, freespace);
-		npages++;
-	}
-
-	ereport(elevel,
-			(errmsg("\"%s\": removed %d row versions in %d pages",
-					RelationGetRelationName(onerel),
-					tupindex, npages),
-			 errdetail("%s.",
-					   pg_rusage_show(&ru0))));
-}
-
-/*
- *	lazy_vacuum_page() -- free dead tuples on a page
- *					 and repair its fragmentation.
- *
- * Caller must hold pin and buffer cleanup lock on the buffer.
- *
- * tupindex is the index in vacrelstats->dead_tuples of the first dead
- * tuple for this page.  We assume the rest follow sequentially.
- * The return value is the first tupindex after the tuples of this page.
- */
-static int
-lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
-				 int tupindex, LVRelStats *vacrelstats)
-{
-	Page		page = BufferGetPage(buffer);
-	OffsetNumber unused[MaxOffsetNumber];
-	int			uncnt = 0;
-
-	START_CRIT_SECTION();
-
-	for (; tupindex < vacrelstats->num_dead_tuples; tupindex++)
-	{
-		BlockNumber tblk;
-		OffsetNumber toff;
-		ItemId		itemid;
-
-		tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]);
-		if (tblk != blkno)
-			break;				/* past end of tuples for this block */
-		toff = ItemPointerGetOffsetNumber(&vacrelstats->dead_tuples[tupindex]);
-		itemid = PageGetItemId(page, toff);
-		ItemIdSetUnused(itemid);
-		unused[uncnt++] = toff;
-	}
-
-	PageRepairFragmentation(page);
-
-	MarkBufferDirty(buffer);
-
-	/* XLOG stuff */
-	if (RelationNeedsWAL(onerel))
-	{
-		XLogRecPtr	recptr;
-
-		recptr = log_heap_clean(onerel, buffer,
-								NULL, 0, NULL, 0,
-								unused, uncnt,
-								vacrelstats->latestRemovedXid);
-		PageSetLSN(page, recptr);
-		PageSetTLI(page, ThisTimeLineID);
-	}
-
-	END_CRIT_SECTION();
-
-	return tupindex;
-}
-
 /*
  *	lazy_vacuum_index() -- vacuum one index relation.
  *
@@ -1197,9 +1117,13 @@ count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats)
 			 * Note: any non-unused item should be taken as a reason to keep
 			 * this page.  We formerly thought that DEAD tuples could be
 			 * thrown away, but that's not so, because we'd not have cleaned
-			 * out their index entries.
+			 * out their index entries. But we can throw away the dead-vacuumed
+			 * tuples created by this vacuum since those index pointers must
+			 * have been removed before we come here
 			 */
-			if (ItemIdIsUsed(itemid))
+			if (ItemIdIsUsed(itemid) &&
+			   	!(ItemIdIsDeadVacuumed(itemid) &&
+				  ItemIdGetVacGen(itemid) == vacrelstats->lastvacgen))
 			{
 				hastup = true;
 				break;			/* can stop scanning */
diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c
index 6bd3812..f72fe9c 100644
--- a/src/backend/storage/page/bufpage.c
+++ b/src/backend/storage/page/bufpage.c
@@ -397,7 +397,7 @@ PageRepairFragmentation(Page page)
 		lp = PageGetItemId(page, i);
 		if (ItemIdIsUsed(lp))
 		{
-			if (ItemIdHasStorage(lp))
+			if (!ItemIdIsDead(lp) && ItemIdHasStorage(lp))
 				nstorage++;
 		}
 		else
@@ -410,7 +410,13 @@ PageRepairFragmentation(Page page)
 
 	if (nstorage == 0)
 	{
-		/* Page is completely empty, so just reset it quickly */
+		/* 
+		 * Page is completely empty, so just reset it quickly
+		 *
+		 * Note: We don't reset the pd_lower because the page may still have
+		 * DEAD line pointers with index pointers pointing to them and its not
+		 * safe to remove them before the index pointers are first removed
+		 */
 		((PageHeader) page)->pd_upper = pd_special;
 	}
 	else
@@ -422,7 +428,7 @@ PageRepairFragmentation(Page page)
 		for (i = 0; i < nline; i++)
 		{
 			lp = PageGetItemId(page, i + 1);
-			if (ItemIdHasStorage(lp))
+			if (!ItemIdIsDead(lp) && ItemIdHasStorage(lp))
 			{
 				itemidptr->offsetindex = i;
 				itemidptr->itemoff = ItemIdGetOffset(lp);
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 56036a8..aa35227 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -135,7 +135,8 @@ extern XLogRecPtr log_heap_clean(Relation reln, Buffer buffer,
 			   OffsetNumber *redirected, int nredirected,
 			   OffsetNumber *nowdead, int ndead,
 			   OffsetNumber *nowunused, int nunused,
-			   TransactionId latestRemovedXid);
+			   TransactionId latestRemovedXid,
+			   uint32 vacgen);
 extern XLogRecPtr log_heap_freeze(Relation reln, Buffer buffer,
 				TransactionId cutoff_xid,
 				OffsetNumber *offsets, int offcnt);
@@ -149,11 +150,13 @@ extern void heap_page_prune_opt(Relation relation, Buffer buffer,
 					TransactionId OldestXmin);
 extern int heap_page_prune(Relation relation, Buffer buffer,
 				TransactionId OldestXmin,
-				bool report_stats, TransactionId *latestRemovedXid);
+				bool report_stats, TransactionId *latestRemovedXid,
+				uint32 vacgen);
 extern void heap_page_prune_execute(Buffer buffer,
 						OffsetNumber *redirected, int nredirected,
 						OffsetNumber *nowdead, int ndead,
-						OffsetNumber *nowunused, int nunused);
+						OffsetNumber *nowunused, int nunused,
+						uint32 vacgen);
 extern void heap_get_root_tuples(Page page, OffsetNumber *root_offsets);
 
 /* in heap/syncscan.c */
diff --git a/src/include/access/htup.h b/src/include/access/htup.h
index ba5d9b2..ac8707c 100644
--- a/src/include/access/htup.h
+++ b/src/include/access/htup.h
@@ -16,6 +16,7 @@
 
 #include "access/tupdesc.h"
 #include "access/tupmacs.h"
+#include "access/xlogdefs.h"
 #include "storage/itemptr.h"
 #include "storage/relfilenode.h"
 
@@ -690,10 +691,11 @@ typedef struct xl_heap_clean
 	TransactionId latestRemovedXid;
 	uint16		nredirected;
 	uint16		ndead;
+	uint32		vacgen;
 	/* OFFSET NUMBERS FOLLOW */
 } xl_heap_clean;
 
-#define SizeOfHeapClean (offsetof(xl_heap_clean, ndead) + sizeof(uint16))
+#define SizeOfHeapClean (offsetof(xl_heap_clean, vacgen) + sizeof(uint32))
 
 /*
  * Cleanup_info is required in some cases during a lazy VACUUM.
diff --git a/src/include/catalog/pg_class.h b/src/include/catalog/pg_class.h
index e006180..8035cda 100644
--- a/src/include/catalog/pg_class.h
+++ b/src/include/catalog/pg_class.h
@@ -65,6 +65,8 @@ CATALOG(pg_class,1259) BKI_BOOTSTRAP BKI_ROWTYPE_OID(83) BKI_SCHEMA_MACRO
 	bool		relhastriggers; /* has (or has had) any TRIGGERs */
 	bool		relhassubclass; /* has (or has had) derived classes */
 	TransactionId relfrozenxid; /* all Xids < this are frozen in this rel */
+	int4	    relnextvacgen; 	/* generation number of the next vacuum */
+	int4	    rellastvacgen; 	/* generation number of last successful vacuum */
 
 	/*
 	 * VARIABLE LENGTH FIELDS start here.  These fields may be NULL, too.
@@ -78,7 +80,7 @@ CATALOG(pg_class,1259) BKI_BOOTSTRAP BKI_ROWTYPE_OID(83) BKI_SCHEMA_MACRO
 
 /* Size of fixed part of pg_class tuples, not counting var-length fields */
 #define CLASS_TUPLE_SIZE \
-	 (offsetof(FormData_pg_class,relfrozenxid) + sizeof(TransactionId))
+	 (offsetof(FormData_pg_class, rellastvacgen) + sizeof(int4))
 
 /* ----------------
  *		Form_pg_class corresponds to a pointer to a tuple with
@@ -92,7 +94,7 @@ typedef FormData_pg_class *Form_pg_class;
  * ----------------
  */
 
-#define Natts_pg_class					26
+#define Natts_pg_class					28
 #define Anum_pg_class_relname			1
 #define Anum_pg_class_relnamespace		2
 #define Anum_pg_class_reltype			3
@@ -117,8 +119,10 @@ typedef FormData_pg_class *Form_pg_class;
 #define Anum_pg_class_relhastriggers	22
 #define Anum_pg_class_relhassubclass	23
 #define Anum_pg_class_relfrozenxid		24
-#define Anum_pg_class_relacl			25
-#define Anum_pg_class_reloptions		26
+#define Anum_pg_class_relnextvacgen		25
+#define Anum_pg_class_rellastvacgen		26
+#define Anum_pg_class_relacl			27
+#define Anum_pg_class_reloptions		28
 
 /* ----------------
  *		initial contents of pg_class
@@ -130,13 +134,13 @@ typedef FormData_pg_class *Form_pg_class;
  */
 
 /* Note: "3" in the relfrozenxid column stands for FirstNormalTransactionId */
-DATA(insert OID = 1247 (  pg_type		PGNSP 71 0 PGUID 0 0 0 0 0 0 0 f f p r 29 0 t f f f f 3 _null_ _null_ ));
+DATA(insert OID = 1247 (  pg_type		PGNSP 71 0 PGUID 0 0 0 0 0 0 0 f f p r 29 0 t f f f f 3 1 0 _null_ _null_ ));
 DESCR("");
-DATA(insert OID = 1249 (  pg_attribute	PGNSP 75 0 PGUID 0 0 0 0 0 0 0 f f p r 21 0 f f f f f 3 _null_ _null_ ));
+DATA(insert OID = 1249 (  pg_attribute	PGNSP 75 0 PGUID 0 0 0 0 0 0 0 f f p r 21 0 f f f f f 3 1 0 _null_ _null_ ));
 DESCR("");
-DATA(insert OID = 1255 (  pg_proc		PGNSP 81 0 PGUID 0 0 0 0 0 0 0 f f p r 26 0 t f f f f 3 _null_ _null_ ));
+DATA(insert OID = 1255 (  pg_proc		PGNSP 81 0 PGUID 0 0 0 0 0 0 0 f f p r 26 0 t f f f f 3 1 0 _null_ _null_ ));
 DESCR("");
-DATA(insert OID = 1259 (  pg_class		PGNSP 83 0 PGUID 0 0 0 0 0 0 0 f f p r 26 0 t f f f f 3 _null_ _null_ ));
+DATA(insert OID = 1259 (  pg_class		PGNSP 83 0 PGUID 0 0 0 0 0 0 0 f f p r 28 0 t f f f f 3 1 0 _null_ _null_ ));
 DESCR("");
 
 
diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h
index cfbe0c4..4c7480d 100644
--- a/src/include/commands/vacuum.h
+++ b/src/include/commands/vacuum.h
@@ -151,6 +151,8 @@ extern void vac_update_relstats(Relation relation,
 					double num_tuples,
 					bool hasindex,
 					TransactionId frozenxid);
+extern void vac_update_lastvacgen(Relation relation, uint32 vacgen);
+extern uint32 vac_update_nextvacgen(Relation relation);
 extern void vacuum_set_xid_limits(int freeze_min_age, int freeze_table_age,
 					  bool sharedRel,
 					  TransactionId *oldestXmin,
diff --git a/src/include/storage/itemid.h b/src/include/storage/itemid.h
index 961d2c2..c0fbd69 100644
--- a/src/include/storage/itemid.h
+++ b/src/include/storage/itemid.h
@@ -19,7 +19,11 @@
  *
  * In some cases an item pointer is "in use" but does not have any associated
  * storage on the page.  By convention, lp_len == 0 in every item pointer
- * that does not have storage, independently of its lp_flags state.
+ * that does not have storage, independently of its lp_flags state. But
+ * lp_len != 0 does not imply that the line pointer has storage, not at least
+ * for heap tuples where we use lp_len (and lp_off) to store the vacuum
+ * generation number for dead-vacuumed tuples. In such cases, lp_flags must be
+ * set to LP_DEAD though.
  */
 typedef struct ItemIdData
 {
@@ -33,11 +37,16 @@ typedef ItemIdData *ItemId;
 /*
  * lp_flags has these possible states.	An UNUSED line pointer is available
  * for immediate re-use, the other states are not.
+ *
+ * A DEAD line pointer in heap does not have any storage associated with it.
+ * But a similar pointer in an index page may still have storage associated
+ * with it since we don't defrag index pages online.
  */
 #define LP_UNUSED		0		/* unused (should always have lp_len=0) */
 #define LP_NORMAL		1		/* used (should always have lp_len>0) */
 #define LP_REDIRECT		2		/* HOT redirect (should have lp_len=0) */
-#define LP_DEAD			3		/* dead, may or may not have storage */
+#define LP_DEAD			3		/* dead or dead-vacuumed. Heap tuples don't have
+								   storage, but index tuples may have */
 
 /*
  * Item offsets and lengths are represented by these types when
@@ -107,14 +116,26 @@ typedef uint16 ItemLength;
 
 /*
  * ItemIdIsDead
- *		True iff item identifier is in state DEAD.
+ *		True iff item identifier is in state DEAD or DEAD VACUUMED
  */
 #define ItemIdIsDead(itemId) \
 	((itemId)->lp_flags == LP_DEAD)
 
 /*
+ * ItemIdIsDeadVacuumed
+ *		True iff item identifier is in state DEAD VACUUMED.
+ */
+#define ItemIdIsDeadVacuumed(itemId) \
+	(((itemId)->lp_flags == LP_DEAD) && \
+	 (((itemId)->lp_off != 0) || \
+	 ((itemid)->lp_len != 0)))
+
+/*
  * ItemIdHasStorage
- *		True iff item identifier has associated storage.
+ *		True iff item identifier has associated storage. For DEAD line
+ *		pointers, this applies only for index tuple since DEAD heap tuple
+ *		never has storage associated with it. In fact, the lp_off/lp_len for
+ *		DEAD heap line pointers are used to store the vacuum generation number 
  */
 #define ItemIdHasStorage(itemId) \
 	((itemId)->lp_len != 0)
@@ -168,6 +189,37 @@ typedef uint16 ItemLength;
 )
 
 /*
+ * ItemIdSetDeadVacuumed
+ *		Set the item identifier to be DEAD VACUUMED, with no storage.
+ *		Beware of multiple evaluations of itemId!
+ *
+ *	Note: we save the generation number of the vacuum creating this dead-vacuumed
+ *	line pointer. We reuse the lp_off/lp_len for this purpose since the
+ *	dead-vacuumed line pointers only exist in the heap and lp_off/lp_len is not
+ *	used for dead line pointers in the heap.
+ *
+ *	Store the 30 LSB of the vacuum generation number in lp_off/lp_len
+ */
+#define ItemIdSetDeadVacuumed(itemId, vacgen) \
+( \
+	(itemId)->lp_flags = LP_DEAD, \
+	(itemId)->lp_off = ((vacgen) & (0x3fffffff << 15)), \
+	(itemId)->lp_len = ((vacgen) & (0x3fffffff >> 15)) \
+)
+
+/*
+ * Get the generation number of the vacuum that created this dead-vacuumed line
+ * pointer.
+ *
+ * Note: must be called only for the dead-vacuumed line pointers
+ */
+#define ItemIdGetVacGen(itemId) \
+( \
+  	AssertMacro(ItemIdIsDeadVacuumed(itemId)), \
+  	(((int32)((itemId)->lp_off) << 15) | (itemId)->lp_len) \
+)
+
+/*
  * ItemIdMarkDead
  *		Set the item identifier to be DEAD, keeping its existing storage.
  *
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index 173dc16..d602b24 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -359,6 +359,13 @@ typedef struct StdRdOptions
 	((relation)->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
 
 /*
+ * RelationGetVacuumGen
+ * 		Get vacuum generation number for the relation
+ */
+#define RelationGetLastVacGen(relation) \
+	((relation)->rd_rel->rellastvacgen)
+
+/*
  * RELATION_IS_LOCAL
  *		If a rel is either temp or newly created in the current transaction,
  *		it can be assumed to be visible only to the current backend.
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to