On Wed, Sep 7, 2011 at 8:28 AM, Andy Colson <a...@squeakycode.net> wrote:
> On 08/22/2011 01:22 AM, Pavan Deolasee wrote:

>>
>
> Hi Pavan, I tried to apply your patch to git master (as of just now) and it
> failed.  I assume that's what I should be checking out, right?
>

Yeah, seems like it bit-rotted. Please try the attached patch. I also
fixed a typo and  added some more comments as per suggestion by Jim.

Thanks,
Pavan

-- 
Pavan Deolasee
EnterpriseDB     http://www.enterprisedb.com
diff --git a/contrib/pageinspect/heapfuncs.c b/contrib/pageinspect/heapfuncs.c
index fa50655..2c1ab2c 100644
--- a/contrib/pageinspect/heapfuncs.c
+++ b/contrib/pageinspect/heapfuncs.c
@@ -150,6 +150,7 @@ heap_page_items(PG_FUNCTION_ARGS)
 		 * many other ways, but at least we won't crash.
 		 */
 		if (ItemIdHasStorage(id) &&
+			!ItemIdIsDead(id) &&
 			lp_len >= sizeof(HeapTupleHeader) &&
 			lp_offset == MAXALIGN(lp_offset) &&
 			lp_offset + lp_len <= raw_page_size)
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 06db65d..cf65c05 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -3984,7 +3984,8 @@ log_heap_clean(Relation reln, Buffer buffer,
 			   OffsetNumber *redirected, int nredirected,
 			   OffsetNumber *nowdead, int ndead,
 			   OffsetNumber *nowunused, int nunused,
-			   TransactionId latestRemovedXid)
+			   TransactionId latestRemovedXid,
+			   uint32 vacgen)
 {
 	xl_heap_clean xlrec;
 	uint8		info;
@@ -3999,6 +4000,7 @@ log_heap_clean(Relation reln, Buffer buffer,
 	xlrec.latestRemovedXid = latestRemovedXid;
 	xlrec.nredirected = nredirected;
 	xlrec.ndead = ndead;
+	xlrec.vacgen = vacgen;
 
 	rdata[0].data = (char *) &xlrec;
 	rdata[0].len = SizeOfHeapClean;
@@ -4300,6 +4302,7 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record)
 	int			ndead;
 	int			nunused;
 	Size		freespace;
+	uint32		vacgen;
 
 	/*
 	 * We're about to remove tuples. In Hot Standby mode, ensure that there's
@@ -4332,6 +4335,7 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record)
 
 	nredirected = xlrec->nredirected;
 	ndead = xlrec->ndead;
+	vacgen = xlrec->vacgen;
 	end = (OffsetNumber *) ((char *) xlrec + record->xl_len);
 	redirected = (OffsetNumber *) ((char *) xlrec + SizeOfHeapClean);
 	nowdead = redirected + (nredirected * 2);
@@ -4343,7 +4347,8 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record)
 	heap_page_prune_execute(buffer,
 							redirected, nredirected,
 							nowdead, ndead,
-							nowunused, nunused);
+							nowunused, nunused,
+							vacgen);
 
 	freespace = PageGetHeapFreeSpace(page);		/* needed to update FSM below */
 
diff --git a/src/backend/access/heap/pruneheap.c b/src/backend/access/heap/pruneheap.c
index 61f2ce4..ee64758 100644
--- a/src/backend/access/heap/pruneheap.c
+++ b/src/backend/access/heap/pruneheap.c
@@ -29,9 +29,12 @@ typedef struct
 	TransactionId new_prune_xid;	/* new prune hint value for page */
 	TransactionId latestRemovedXid;		/* latest xid to be removed by this
 										 * prune */
+	int			already_dead;		/* number of already dead line pointers */
+
 	int			nredirected;	/* numbers of entries in arrays below */
 	int			ndead;
 	int			nunused;
+
 	/* arrays that accumulate indexes of items to be changed */
 	OffsetNumber redirected[MaxHeapTuplesPerPage * 2];
 	OffsetNumber nowdead[MaxHeapTuplesPerPage];
@@ -123,8 +126,8 @@ heap_page_prune_opt(Relation relation, Buffer buffer, TransactionId OldestXmin)
 			TransactionId ignore = InvalidTransactionId;		/* return value not
 																 * needed */
 
-			/* OK to prune */
-			(void) heap_page_prune(relation, buffer, OldestXmin, true, &ignore);
+			/* OK to prune - pass invalid vacuum generation number */
+			(void) heap_page_prune(relation, buffer, OldestXmin, true, &ignore, 0);
 		}
 
 		/* And release buffer lock */
@@ -151,13 +154,15 @@ heap_page_prune_opt(Relation relation, Buffer buffer, TransactionId OldestXmin)
  */
 int
 heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
-				bool report_stats, TransactionId *latestRemovedXid)
+				bool report_stats, TransactionId *latestRemovedXid,
+				uint32 current_vacgen)
 {
 	int			ndeleted = 0;
 	Page		page = BufferGetPage(buffer);
 	OffsetNumber offnum,
 				maxoff;
 	PruneState	prstate;
+	uint32		last_finished_vacgen = RelationGetLastVacGen(relation);
 
 	/*
 	 * Our strategy is to scan the page and make lists of items to change,
@@ -173,6 +178,7 @@ heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
 	prstate.new_prune_xid = InvalidTransactionId;
 	prstate.latestRemovedXid = InvalidTransactionId;
 	prstate.nredirected = prstate.ndead = prstate.nunused = 0;
+	prstate.already_dead = 0;
 	memset(prstate.marked, 0, sizeof(prstate.marked));
 
 	/* Scan the page */
@@ -189,8 +195,26 @@ heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
 
 		/* Nothing to do if slot is empty or already dead */
 		itemid = PageGetItemId(page, offnum);
-		if (!ItemIdIsUsed(itemid) || ItemIdIsDead(itemid))
+		if (!ItemIdIsUsed(itemid))
 			continue;
+		
+		/* 
+		 * If the slot is dead-vacuumed and we know that the index pointers
+		 * have already been vacuumed by the last index vacuum, just mark them
+		 * unused so that they are removed when we defrag the page
+		 */
+		if (ItemIdIsDeadVacuumed(itemid))
+		{
+			if (ItemIdGetVacGen(itemid) == last_finished_vacgen)
+				heap_prune_record_unused(&prstate, offnum);
+			continue;
+		}
+		else if (ItemIdIsDead(itemid))
+		{
+			heap_prune_record_dead(&prstate, offnum);
+			prstate.already_dead++;
+			continue;
+		}
 
 		/* Process this item or chain of items */
 		ndeleted += heap_prune_chain(relation, buffer, offnum,
@@ -211,7 +235,8 @@ heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
 		heap_page_prune_execute(buffer,
 								prstate.redirected, prstate.nredirected,
 								prstate.nowdead, prstate.ndead,
-								prstate.nowunused, prstate.nunused);
+								prstate.nowunused, prstate.nunused,
+								current_vacgen);
 
 		/*
 		 * Update the page's pd_prune_xid field to either zero, or the lowest
@@ -239,7 +264,8 @@ heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
 									prstate.redirected, prstate.nredirected,
 									prstate.nowdead, prstate.ndead,
 									prstate.nowunused, prstate.nunused,
-									prstate.latestRemovedXid);
+									prstate.latestRemovedXid,
+									current_vacgen);
 
 			PageSetLSN(BufferGetPage(buffer), recptr);
 			PageSetTLI(BufferGetPage(buffer), ThisTimeLineID);
@@ -271,9 +297,12 @@ heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
 	 * If requested, report the number of tuples reclaimed to pgstats. This is
 	 * ndeleted minus ndead, because we don't want to count a now-DEAD root
 	 * item as a deletion for this purpose.
+	 *
+	 * Adjust already_dead since they are counted as ndead and we really don't
+	 * want to include them here
 	 */
-	if (report_stats && ndeleted > prstate.ndead)
-		pgstat_update_heap_dead_tuples(relation, ndeleted - prstate.ndead);
+	if (report_stats && ndeleted > (prstate.ndead - prstate.already_dead)) 
+		pgstat_update_heap_dead_tuples(relation, ndeleted - (prstate.ndead - prstate.already_dead));
 
 	*latestRemovedXid = prstate.latestRemovedXid;
 
@@ -643,7 +672,8 @@ void
 heap_page_prune_execute(Buffer buffer,
 						OffsetNumber *redirected, int nredirected,
 						OffsetNumber *nowdead, int ndead,
-						OffsetNumber *nowunused, int nunused)
+						OffsetNumber *nowunused, int nunused,
+						uint32 vacgen)
 {
 	Page		page = (Page) BufferGetPage(buffer);
 	OffsetNumber *offnum;
@@ -667,7 +697,17 @@ heap_page_prune_execute(Buffer buffer,
 		OffsetNumber off = *offnum++;
 		ItemId		lp = PageGetItemId(page, off);
 
-		ItemIdSetDead(lp);
+		/*
+		 * If we are called from a vacuum (vacgen > 0), mark the line pointers
+		 * as dead-vacuumed and also store the current vacuum generation number
+		 * in the line pointer. OTOH if we are called from a normal HOT-prune
+		 * routine, mark the line pointers as DEAD since the index pointers to
+		 * them will not be removed just yet.
+		 */
+		if (vacgen)
+			ItemIdSetDeadVacuumed(lp, vacgen);
+		else
+			ItemIdSetDead(lp);
 	}
 
 	/* Update all now-unused line pointers */
diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c
index 2aaf775..d640680 100644
--- a/src/backend/catalog/heap.c
+++ b/src/backend/catalog/heap.c
@@ -786,6 +786,8 @@ InsertPgClassTuple(Relation pg_class_desc,
 	values[Anum_pg_class_relhastriggers - 1] = BoolGetDatum(rd_rel->relhastriggers);
 	values[Anum_pg_class_relhassubclass - 1] = BoolGetDatum(rd_rel->relhassubclass);
 	values[Anum_pg_class_relfrozenxid - 1] = TransactionIdGetDatum(rd_rel->relfrozenxid);
+	values[Anum_pg_class_relnextvacgen - 1] = Int32GetDatum(rd_rel->relnextvacgen);
+	values[Anum_pg_class_rellastvacgen - 1] = Int32GetDatum(rd_rel->rellastvacgen);
 	if (relacl != (Datum) 0)
 		values[Anum_pg_class_relacl - 1] = relacl;
 	else
@@ -880,6 +882,9 @@ AddNewRelationTuple(Relation pg_class_desc,
 		new_rel_reltup->relfrozenxid = InvalidTransactionId;
 	}
 
+	new_rel_reltup->relnextvacgen = 1;
+	new_rel_reltup->rellastvacgen = 0;
+
 	new_rel_reltup->relowner = relowner;
 	new_rel_reltup->reltype = new_type_oid;
 	new_rel_reltup->reloftype = reloftype;
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index 6b0a4e7..c074524 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -1073,11 +1073,20 @@ acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows,
 			 * pointers should be counted as dead, because we need vacuum to
 			 * run to get rid of them.	Note that this rule agrees with the
 			 * way that heap_page_prune() counts things.
+			 *
+			 * XXX We don't count dead line pointers if know that they can be
+			 * removed by a HOT cleanup.
 			 */
 			if (!ItemIdIsNormal(itemid))
 			{
-				if (ItemIdIsDead(itemid))
-					deadrows += 1;
+				if (ItemIdIsDeadVacuumed(itemid))
+				{
+					if (ItemIdGetVacGen(itemid) != RelationGetLastVacGen(onerel))
+						deadrows += 1;
+				}
+				else if (ItemIdIsDead(itemid))
+					deadrows++;
+
 				continue;
 			}
 
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 7fe787e..d3c92c9 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -645,6 +645,88 @@ vac_update_relstats(Relation relation,
 	heap_close(rd, RowExclusiveLock);
 }
 
+/*
+ * Grab the next vacuum generation number to be used to stamp the dead-vacuumed
+ * line pointers and also increment the generation number.
+ */
+uint32
+vac_update_nextvacgen(Relation relation)
+{
+	Oid			relid = RelationGetRelid(relation);
+	Relation	rd;
+	HeapTuple	ctup;
+	Form_pg_class pgcform;
+	uint32		nextvacgen;
+
+	rd = heap_open(RelationRelationId, RowExclusiveLock);
+
+	/* Fetch a copy of the tuple to scribble on */
+	ctup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relid));
+	if (!HeapTupleIsValid(ctup))
+		elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
+			 relid);
+	pgcform = (Form_pg_class) GETSTRUCT(ctup);
+
+	/* Remember the next vacuum generation number before incrementing it */
+	nextvacgen = pgcform->relnextvacgen;
+
+	/* 
+	 * Increment while taking care of wrap-around (without using zero)
+	 *
+	 * Note: We don't worry about the wrap-around issues here since it would
+	 * take a 1 Billion vacuums on the same relation for the vacuum generation
+	 * to wrap-around. That would take ages to happen and even if it happens,
+	 * the chances that we might have dead-vacuumed line pointers still
+	 * stamped with the old (failed) vacuum are infinitely small since some
+	 * other vacuum cycle would have taken care of them.
+	 */
+	pgcform->relnextvacgen = pgcform->relnextvacgen + 1;
+	if (pgcform->relnextvacgen == 0x80000000)
+		pgcform->relnextvacgen = 1;
+
+	heap_inplace_update(rd, ctup);
+
+	heap_close(rd, RowExclusiveLock);
+
+	/* 
+	 * Increase command counter since we want to see the updated row when we
+	 * again come back to set the rellastvacgen when the vacuum completes and
+	 * we don't to forget what we just did above
+	 */
+	CommandCounterIncrement();
+
+	return nextvacgen;
+}
+
+/*
+ * Update the generation number of the last successful index vacuum.
+ */
+void
+vac_update_lastvacgen(Relation relation, uint32 vacgen)
+{
+	Oid			relid = RelationGetRelid(relation);
+	Relation	rd;
+	HeapTuple	ctup;
+	Form_pg_class pgcform;
+
+	rd = heap_open(RelationRelationId, RowExclusiveLock);
+
+	/* Fetch a copy of the tuple to scribble on */
+	ctup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relid));
+	if (!HeapTupleIsValid(ctup))
+		elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
+			 relid);
+	pgcform = (Form_pg_class) GETSTRUCT(ctup);
+
+	/* Store the 30 LSB to match with what we store in the line pointers */
+	pgcform->rellastvacgen = (vacgen & 0x3fffffff);
+
+	heap_inplace_update(rd, ctup);
+
+	heap_close(rd, RowExclusiveLock);
+
+	CommandCounterIncrement();
+}
 
 /*
  *	vac_update_datfrozenxid() -- update pg_database.datfrozenxid for our DB
diff --git a/src/backend/commands/vacuumlazy.c b/src/backend/commands/vacuumlazy.c
index a2420a8..74558df 100644
--- a/src/backend/commands/vacuumlazy.c
+++ b/src/backend/commands/vacuumlazy.c
@@ -100,6 +100,7 @@ typedef struct LVRelStats
 	ItemPointer dead_tuples;	/* array of ItemPointerData */
 	int			num_index_scans;
 	TransactionId latestRemovedXid;
+	uint32		lastvacgen;
 } LVRelStats;
 
 
@@ -115,15 +116,12 @@ static BufferAccessStrategy vac_strategy;
 /* non-export function prototypes */
 static void lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 			   Relation *Irel, int nindexes, bool scan_all);
-static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats);
 static void lazy_vacuum_index(Relation indrel,
 				  IndexBulkDeleteResult **stats,
 				  LVRelStats *vacrelstats);
 static void lazy_cleanup_index(Relation indrel,
 				   IndexBulkDeleteResult *stats,
 				   LVRelStats *vacrelstats);
-static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
-				 int tupindex, LVRelStats *vacrelstats);
 static void lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats);
 static BlockNumber count_nondeletable_pages(Relation onerel,
 						 LVRelStats *vacrelstats);
@@ -211,6 +209,10 @@ lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt,
 	/* Vacuum the Free Space Map */
 	FreeSpaceMapVacuum(onerel);
 
+	/* Since vacuum ran to completion, remember the vacuum generation number */
+	if (vacrelstats->lastvacgen != 0)
+		vac_update_lastvacgen(onerel, vacrelstats->lastvacgen);
+
 	/*
 	 * Update statistics in pg_class.
 	 *
@@ -312,6 +314,41 @@ vacuum_log_cleanup_info(Relation rel, LVRelStats *vacrelstats)
  *
  *		If there are no indexes then we just vacuum each dirty page as we
  *		process it, since there's no point in gathering many tuples.
+ *
+ *		Starting 9.2, we removed the second heap pass of vacuum and instead
+ *		leave the dead line pointers in the heap to be removed by the next
+ *		vacuum cycle or a HOT-prune operation. We can do this without much
+ *		performance penalty because almost all the dead space is reclaimed in
+ *		the first pass itself (except that which is taken by the dead line
+ *		pointers and there is no guarantee that will be freed by the second
+ *		pass anyways). But this gives us two significant benefits:
+ *
+ *		1. We don't have to scan the heap again. Even though visibility map
+ *		lets us scan only the necessary pages, in many cases this would still
+ *		be a large part of the relation
+ *
+ *		2. We don't have to write the heap pages (and associated WAL) twice.
+ *		Since vacuum use ring-buffers for heap scan, this would actually mean
+ *		disk IO unless the relation is very small.
+ *
+ *		The way we do this is by tracking the last successful vacuum by its
+ *		generation number in the pg_class row. When a dead line pointer is
+ *		collected by a vacuum, we store the generation number of the vacuum in
+ *		the line pointer itself (lp_off/lp_len is not used for DEAD heap line
+ *		pointer and that gives us 30-bits of unused space to store the
+ *		information). Later on, either as part of the HOT-prune or the next
+ *		vacuum on the table, we check if the vacuum generation number stored in
+ *		a dead-vacuumed lined pointer is same as the last successful vacuum on
+ *		the table and remove those dead-vacuumed line pointers. We are sure at
+ *		that point that the index pointers to those dead-vacuumed line pointers
+ *		must have been already removed.
+ *
+ *		If the vacuum operation that generated the dead-vacuumed line pointer
+ *		aborts in the middle, the subsequent vacuum will again scan these line
+ *		pointers and stamp them with its generation number. Finally, when the
+ *		vacuum finishes successfully and this information is recorded in the
+ *		pg_class row, the dead-vacuumed line pointers are cleaned up from the
+ *		heap.
  */
 static void
 lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
@@ -333,6 +370,7 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 	Buffer		vmbuffer = InvalidBuffer;
 	BlockNumber next_not_all_visible_block;
 	bool		skipping_all_visible_blocks;
+	int			current_vacgen;
 
 	pg_rusage_init(&ru0);
 
@@ -345,6 +383,23 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 	empty_pages = vacuumed_pages = 0;
 	num_tuples = tups_vacuumed = nkeep = nunused = 0;
 
+	/*
+	 * Before starting the vacuum, grab the next vacuum generation number for
+	 * this relation. Whenever a block is scanned and dead line pointers are
+	 * collected, we store the vacuum generation number in the line pointer
+	 * offset (since lp_off is not useful for dead heap line pointers).
+	 *
+	 * We also update the relnextvacgen to guard against the case when this
+	 * vacuum aborts after scanning few pages. If we don't increment the
+	 * relnextvacgen now, the next vacuum may use the same generation number
+	 * and if it skips the pages scanned by this vacuum (though not possible
+	 * currently because the way visibility map is handled), we might get into
+	 * a situation where the index pointers of some dead-vacuumed line pointers
+	 * are not yet removed, but the vacuum generation number stored in those
+	 * line pointers is same as the last successful vacuum on the table.
+	 */
+	current_vacgen = vac_update_nextvacgen(onerel);
+
 	indstats = (IndexBulkDeleteResult **)
 		palloc0(nindexes * sizeof(IndexBulkDeleteResult *));
 
@@ -458,8 +513,6 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 				lazy_vacuum_index(Irel[i],
 								  &indstats[i],
 								  vacrelstats);
-			/* Remove tuples from heap */
-			lazy_vacuum_heap(onerel, vacrelstats);
 
 			/*
 			 * Forget the now-vacuumed tuples, and press on, but be careful
@@ -555,7 +608,8 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 		 * We count tuples removed by the pruning step as removed by VACUUM.
 		 */
 		tups_vacuumed += heap_page_prune(onerel, buf, OldestXmin, false,
-										 &vacrelstats->latestRemovedXid);
+										 &vacrelstats->latestRemovedXid,
+										 current_vacgen);
 
 		/*
 		 * Now scan the page to collect vacuumable items and check for tuples
@@ -739,24 +793,13 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 			}
 		}
 
+		vacuumed_pages++;
+
 		/*
-		 * If there are no indexes then we can vacuum the page right now
-		 * instead of doing a second scan.
+		 * If there are no indexes, we don't need to remember the dead tuples
 		 */
-		if (nindexes == 0 &&
-			vacrelstats->num_dead_tuples > 0)
-		{
-			/* Remove tuples from heap */
-			lazy_vacuum_page(onerel, blkno, buf, 0, vacrelstats);
-
-			/*
-			 * Forget the now-vacuumed tuples, and press on, but be careful
-			 * not to reset latestRemovedXid since we want that value to be
-			 * valid.
-			 */
+		if (nindexes == 0)
 			vacrelstats->num_dead_tuples = 0;
-			vacuumed_pages++;
-		}
 
 		freespace = PageGetHeapFreeSpace(page);
 
@@ -815,14 +858,9 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 			vacrelstats->nonempty_pages = blkno + 1;
 
 		/*
-		 * If we remembered any tuples for deletion, then the page will be
-		 * visited again by lazy_vacuum_heap, which will compute and record
-		 * its post-compaction free space.	If not, then we're done with this
-		 * page, so remember its free space as-is.	(This path will always be
-		 * taken if there are no indexes.)
+		 * Record the free space on the page.
 		 */
-		if (vacrelstats->num_dead_tuples == prev_dead_count)
-			RecordPageWithFreeSpace(onerel, blkno, freespace);
+		RecordPageWithFreeSpace(onerel, blkno, freespace);
 	}
 
 	/* save stats for use later */
@@ -847,8 +885,6 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 			lazy_vacuum_index(Irel[i],
 							  &indstats[i],
 							  vacrelstats);
-		/* Remove tuples from heap */
-		lazy_vacuum_heap(onerel, vacrelstats);
 		vacrelstats->num_index_scans++;
 	}
 
@@ -859,11 +895,14 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 		vmbuffer = InvalidBuffer;
 	}
 
+	/* Remember the current vacuum generation */
+	vacrelstats->lastvacgen = current_vacgen;
+
 	/* Do post-vacuum cleanup and statistics update for each index */
 	for (i = 0; i < nindexes; i++)
 		lazy_cleanup_index(Irel[i], indstats[i], vacrelstats);
 
-	/* If no indexes, make log report that lazy_vacuum_heap would've made */
+	/* Report vacuum stats */
 	if (vacuumed_pages)
 		ereport(elevel,
 				(errmsg("\"%s\": removed %.0f row versions in %u pages",
@@ -885,118 +924,6 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 					   pg_rusage_show(&ru0))));
 }
 
-
-/*
- *	lazy_vacuum_heap() -- second pass over the heap
- *
- *		This routine marks dead tuples as unused and compacts out free
- *		space on their pages.  Pages not having dead tuples recorded from
- *		lazy_scan_heap are not visited at all.
- *
- * Note: the reason for doing this as a second pass is we cannot remove
- * the tuples until we've removed their index entries, and we want to
- * process index entry removal in batches as large as possible.
- */
-static void
-lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
-{
-	int			tupindex;
-	int			npages;
-	PGRUsage	ru0;
-
-	pg_rusage_init(&ru0);
-	npages = 0;
-
-	tupindex = 0;
-	while (tupindex < vacrelstats->num_dead_tuples)
-	{
-		BlockNumber tblk;
-		Buffer		buf;
-		Page		page;
-		Size		freespace;
-
-		vacuum_delay_point();
-
-		tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]);
-		buf = ReadBufferExtended(onerel, MAIN_FORKNUM, tblk, RBM_NORMAL,
-								 vac_strategy);
-		LockBufferForCleanup(buf);
-		tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, vacrelstats);
-
-		/* Now that we've compacted the page, record its available space */
-		page = BufferGetPage(buf);
-		freespace = PageGetHeapFreeSpace(page);
-
-		UnlockReleaseBuffer(buf);
-		RecordPageWithFreeSpace(onerel, tblk, freespace);
-		npages++;
-	}
-
-	ereport(elevel,
-			(errmsg("\"%s\": removed %d row versions in %d pages",
-					RelationGetRelationName(onerel),
-					tupindex, npages),
-			 errdetail("%s.",
-					   pg_rusage_show(&ru0))));
-}
-
-/*
- *	lazy_vacuum_page() -- free dead tuples on a page
- *					 and repair its fragmentation.
- *
- * Caller must hold pin and buffer cleanup lock on the buffer.
- *
- * tupindex is the index in vacrelstats->dead_tuples of the first dead
- * tuple for this page.  We assume the rest follow sequentially.
- * The return value is the first tupindex after the tuples of this page.
- */
-static int
-lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
-				 int tupindex, LVRelStats *vacrelstats)
-{
-	Page		page = BufferGetPage(buffer);
-	OffsetNumber unused[MaxOffsetNumber];
-	int			uncnt = 0;
-
-	START_CRIT_SECTION();
-
-	for (; tupindex < vacrelstats->num_dead_tuples; tupindex++)
-	{
-		BlockNumber tblk;
-		OffsetNumber toff;
-		ItemId		itemid;
-
-		tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]);
-		if (tblk != blkno)
-			break;				/* past end of tuples for this block */
-		toff = ItemPointerGetOffsetNumber(&vacrelstats->dead_tuples[tupindex]);
-		itemid = PageGetItemId(page, toff);
-		ItemIdSetUnused(itemid);
-		unused[uncnt++] = toff;
-	}
-
-	PageRepairFragmentation(page);
-
-	MarkBufferDirty(buffer);
-
-	/* XLOG stuff */
-	if (RelationNeedsWAL(onerel))
-	{
-		XLogRecPtr	recptr;
-
-		recptr = log_heap_clean(onerel, buffer,
-								NULL, 0, NULL, 0,
-								unused, uncnt,
-								vacrelstats->latestRemovedXid);
-		PageSetLSN(page, recptr);
-		PageSetTLI(page, ThisTimeLineID);
-	}
-
-	END_CRIT_SECTION();
-
-	return tupindex;
-}
-
 /*
  *	lazy_vacuum_index() -- vacuum one index relation.
  *
@@ -1223,9 +1150,13 @@ count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats)
 			 * Note: any non-unused item should be taken as a reason to keep
 			 * this page.  We formerly thought that DEAD tuples could be
 			 * thrown away, but that's not so, because we'd not have cleaned
-			 * out their index entries.
+			 * out their index entries. But we can throw away the dead-vacuumed
+			 * tuples created by this vacuum since those index pointers must
+			 * have been removed before we come here
 			 */
-			if (ItemIdIsUsed(itemid))
+			if (ItemIdIsUsed(itemid) &&
+			   	!(ItemIdIsDeadVacuumed(itemid) &&
+				  ItemIdGetVacGen(itemid) == vacrelstats->lastvacgen))
 			{
 				hastup = true;
 				break;			/* can stop scanning */
diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c
index 018f9c1..07ec438 100644
--- a/src/backend/storage/page/bufpage.c
+++ b/src/backend/storage/page/bufpage.c
@@ -396,7 +396,7 @@ PageRepairFragmentation(Page page)
 		lp = PageGetItemId(page, i);
 		if (ItemIdIsUsed(lp))
 		{
-			if (ItemIdHasStorage(lp))
+			if (!ItemIdIsDead(lp) && ItemIdHasStorage(lp))
 				nstorage++;
 		}
 		else
@@ -409,7 +409,13 @@ PageRepairFragmentation(Page page)
 
 	if (nstorage == 0)
 	{
-		/* Page is completely empty, so just reset it quickly */
+		/* 
+		 * Page is completely empty, so just reset it quickly
+		 *
+		 * Note: We don't reset the pd_lower because the page may still have
+		 * DEAD line pointers with index pointers pointing to them and its not
+		 * safe to remove them before the index pointers are first removed
+		 */
 		((PageHeader) page)->pd_upper = pd_special;
 	}
 	else
@@ -421,7 +427,7 @@ PageRepairFragmentation(Page page)
 		for (i = 0; i < nline; i++)
 		{
 			lp = PageGetItemId(page, i + 1);
-			if (ItemIdHasStorage(lp))
+			if (!ItemIdIsDead(lp) && ItemIdHasStorage(lp))
 			{
 				itemidptr->offsetindex = i;
 				itemidptr->itemoff = ItemIdGetOffset(lp);
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 776ea5c..b1395ee 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -133,7 +133,8 @@ extern XLogRecPtr log_heap_clean(Relation reln, Buffer buffer,
 			   OffsetNumber *redirected, int nredirected,
 			   OffsetNumber *nowdead, int ndead,
 			   OffsetNumber *nowunused, int nunused,
-			   TransactionId latestRemovedXid);
+			   TransactionId latestRemovedXid,
+			   uint32 vacgen);
 extern XLogRecPtr log_heap_freeze(Relation reln, Buffer buffer,
 				TransactionId cutoff_xid,
 				OffsetNumber *offsets, int offcnt);
@@ -147,11 +148,13 @@ extern void heap_page_prune_opt(Relation relation, Buffer buffer,
 					TransactionId OldestXmin);
 extern int heap_page_prune(Relation relation, Buffer buffer,
 				TransactionId OldestXmin,
-				bool report_stats, TransactionId *latestRemovedXid);
+				bool report_stats, TransactionId *latestRemovedXid,
+				uint32 vacgen);
 extern void heap_page_prune_execute(Buffer buffer,
 						OffsetNumber *redirected, int nredirected,
 						OffsetNumber *nowdead, int ndead,
-						OffsetNumber *nowunused, int nunused);
+						OffsetNumber *nowunused, int nunused,
+						uint32 vacgen);
 extern void heap_get_root_tuples(Page page, OffsetNumber *root_offsets);
 
 /* in heap/syncscan.c */
diff --git a/src/include/access/htup.h b/src/include/access/htup.h
index c025835..4a8d842 100644
--- a/src/include/access/htup.h
+++ b/src/include/access/htup.h
@@ -691,10 +691,11 @@ typedef struct xl_heap_clean
 	TransactionId latestRemovedXid;
 	uint16		nredirected;
 	uint16		ndead;
+	uint32		vacgen;
 	/* OFFSET NUMBERS FOLLOW */
 } xl_heap_clean;
 
-#define SizeOfHeapClean (offsetof(xl_heap_clean, ndead) + sizeof(uint16))
+#define SizeOfHeapClean (offsetof(xl_heap_clean, vacgen) + sizeof(uint32))
 
 /*
  * Cleanup_info is required in some cases during a lazy VACUUM.
diff --git a/src/include/catalog/pg_class.h b/src/include/catalog/pg_class.h
index e006180..8035cda 100644
--- a/src/include/catalog/pg_class.h
+++ b/src/include/catalog/pg_class.h
@@ -65,6 +65,8 @@ CATALOG(pg_class,1259) BKI_BOOTSTRAP BKI_ROWTYPE_OID(83) BKI_SCHEMA_MACRO
 	bool		relhastriggers; /* has (or has had) any TRIGGERs */
 	bool		relhassubclass; /* has (or has had) derived classes */
 	TransactionId relfrozenxid; /* all Xids < this are frozen in this rel */
+	int4	    relnextvacgen; 	/* generation number of the next vacuum */
+	int4	    rellastvacgen; 	/* generation number of last successful vacuum */
 
 	/*
 	 * VARIABLE LENGTH FIELDS start here.  These fields may be NULL, too.
@@ -78,7 +80,7 @@ CATALOG(pg_class,1259) BKI_BOOTSTRAP BKI_ROWTYPE_OID(83) BKI_SCHEMA_MACRO
 
 /* Size of fixed part of pg_class tuples, not counting var-length fields */
 #define CLASS_TUPLE_SIZE \
-	 (offsetof(FormData_pg_class,relfrozenxid) + sizeof(TransactionId))
+	 (offsetof(FormData_pg_class, rellastvacgen) + sizeof(int4))
 
 /* ----------------
  *		Form_pg_class corresponds to a pointer to a tuple with
@@ -92,7 +94,7 @@ typedef FormData_pg_class *Form_pg_class;
  * ----------------
  */
 
-#define Natts_pg_class					26
+#define Natts_pg_class					28
 #define Anum_pg_class_relname			1
 #define Anum_pg_class_relnamespace		2
 #define Anum_pg_class_reltype			3
@@ -117,8 +119,10 @@ typedef FormData_pg_class *Form_pg_class;
 #define Anum_pg_class_relhastriggers	22
 #define Anum_pg_class_relhassubclass	23
 #define Anum_pg_class_relfrozenxid		24
-#define Anum_pg_class_relacl			25
-#define Anum_pg_class_reloptions		26
+#define Anum_pg_class_relnextvacgen		25
+#define Anum_pg_class_rellastvacgen		26
+#define Anum_pg_class_relacl			27
+#define Anum_pg_class_reloptions		28
 
 /* ----------------
  *		initial contents of pg_class
@@ -130,13 +134,13 @@ typedef FormData_pg_class *Form_pg_class;
  */
 
 /* Note: "3" in the relfrozenxid column stands for FirstNormalTransactionId */
-DATA(insert OID = 1247 (  pg_type		PGNSP 71 0 PGUID 0 0 0 0 0 0 0 f f p r 29 0 t f f f f 3 _null_ _null_ ));
+DATA(insert OID = 1247 (  pg_type		PGNSP 71 0 PGUID 0 0 0 0 0 0 0 f f p r 29 0 t f f f f 3 1 0 _null_ _null_ ));
 DESCR("");
-DATA(insert OID = 1249 (  pg_attribute	PGNSP 75 0 PGUID 0 0 0 0 0 0 0 f f p r 21 0 f f f f f 3 _null_ _null_ ));
+DATA(insert OID = 1249 (  pg_attribute	PGNSP 75 0 PGUID 0 0 0 0 0 0 0 f f p r 21 0 f f f f f 3 1 0 _null_ _null_ ));
 DESCR("");
-DATA(insert OID = 1255 (  pg_proc		PGNSP 81 0 PGUID 0 0 0 0 0 0 0 f f p r 26 0 t f f f f 3 _null_ _null_ ));
+DATA(insert OID = 1255 (  pg_proc		PGNSP 81 0 PGUID 0 0 0 0 0 0 0 f f p r 26 0 t f f f f 3 1 0 _null_ _null_ ));
 DESCR("");
-DATA(insert OID = 1259 (  pg_class		PGNSP 83 0 PGUID 0 0 0 0 0 0 0 f f p r 26 0 t f f f f 3 _null_ _null_ ));
+DATA(insert OID = 1259 (  pg_class		PGNSP 83 0 PGUID 0 0 0 0 0 0 0 f f p r 28 0 t f f f f 3 1 0 _null_ _null_ ));
 DESCR("");
 
 
diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h
index cfbe0c4..4c7480d 100644
--- a/src/include/commands/vacuum.h
+++ b/src/include/commands/vacuum.h
@@ -151,6 +151,8 @@ extern void vac_update_relstats(Relation relation,
 					double num_tuples,
 					bool hasindex,
 					TransactionId frozenxid);
+extern void vac_update_lastvacgen(Relation relation, uint32 vacgen);
+extern uint32 vac_update_nextvacgen(Relation relation);
 extern void vacuum_set_xid_limits(int freeze_min_age, int freeze_table_age,
 					  bool sharedRel,
 					  TransactionId *oldestXmin,
diff --git a/src/include/storage/itemid.h b/src/include/storage/itemid.h
index 961d2c2..c0fbd69 100644
--- a/src/include/storage/itemid.h
+++ b/src/include/storage/itemid.h
@@ -19,7 +19,11 @@
  *
  * In some cases an item pointer is "in use" but does not have any associated
  * storage on the page.  By convention, lp_len == 0 in every item pointer
- * that does not have storage, independently of its lp_flags state.
+ * that does not have storage, independently of its lp_flags state. But
+ * lp_len != 0 does not imply that the line pointer has storage, not at least
+ * for heap tuples where we use lp_len (and lp_off) to store the vacuum
+ * generation number for dead-vacuumed tuples. In such cases, lp_flags must be
+ * set to LP_DEAD though.
  */
 typedef struct ItemIdData
 {
@@ -33,11 +37,16 @@ typedef ItemIdData *ItemId;
 /*
  * lp_flags has these possible states.	An UNUSED line pointer is available
  * for immediate re-use, the other states are not.
+ *
+ * A DEAD line pointer in heap does not have any storage associated with it.
+ * But a similar pointer in an index page may still have storage associated
+ * with it since we don't defrag index pages online.
  */
 #define LP_UNUSED		0		/* unused (should always have lp_len=0) */
 #define LP_NORMAL		1		/* used (should always have lp_len>0) */
 #define LP_REDIRECT		2		/* HOT redirect (should have lp_len=0) */
-#define LP_DEAD			3		/* dead, may or may not have storage */
+#define LP_DEAD			3		/* dead or dead-vacuumed. Heap tuples don't have
+								   storage, but index tuples may have */
 
 /*
  * Item offsets and lengths are represented by these types when
@@ -107,14 +116,26 @@ typedef uint16 ItemLength;
 
 /*
  * ItemIdIsDead
- *		True iff item identifier is in state DEAD.
+ *		True iff item identifier is in state DEAD or DEAD VACUUMED
  */
 #define ItemIdIsDead(itemId) \
 	((itemId)->lp_flags == LP_DEAD)
 
 /*
+ * ItemIdIsDeadVacuumed
+ *		True iff item identifier is in state DEAD VACUUMED.
+ */
+#define ItemIdIsDeadVacuumed(itemId) \
+	(((itemId)->lp_flags == LP_DEAD) && \
+	 (((itemId)->lp_off != 0) || \
+	 ((itemid)->lp_len != 0)))
+
+/*
  * ItemIdHasStorage
- *		True iff item identifier has associated storage.
+ *		True iff item identifier has associated storage. For DEAD line
+ *		pointers, this applies only for index tuple since DEAD heap tuple
+ *		never has storage associated with it. In fact, the lp_off/lp_len for
+ *		DEAD heap line pointers are used to store the vacuum generation number 
  */
 #define ItemIdHasStorage(itemId) \
 	((itemId)->lp_len != 0)
@@ -168,6 +189,37 @@ typedef uint16 ItemLength;
 )
 
 /*
+ * ItemIdSetDeadVacuumed
+ *		Set the item identifier to be DEAD VACUUMED, with no storage.
+ *		Beware of multiple evaluations of itemId!
+ *
+ *	Note: we save the generation number of the vacuum creating this dead-vacuumed
+ *	line pointer. We reuse the lp_off/lp_len for this purpose since the
+ *	dead-vacuumed line pointers only exist in the heap and lp_off/lp_len is not
+ *	used for dead line pointers in the heap.
+ *
+ *	Store the 30 LSB of the vacuum generation number in lp_off/lp_len
+ */
+#define ItemIdSetDeadVacuumed(itemId, vacgen) \
+( \
+	(itemId)->lp_flags = LP_DEAD, \
+	(itemId)->lp_off = ((vacgen) & (0x3fffffff << 15)), \
+	(itemId)->lp_len = ((vacgen) & (0x3fffffff >> 15)) \
+)
+
+/*
+ * Get the generation number of the vacuum that created this dead-vacuumed line
+ * pointer.
+ *
+ * Note: must be called only for the dead-vacuumed line pointers
+ */
+#define ItemIdGetVacGen(itemId) \
+( \
+  	AssertMacro(ItemIdIsDeadVacuumed(itemId)), \
+  	(((int32)((itemId)->lp_off) << 15) | (itemId)->lp_len) \
+)
+
+/*
  * ItemIdMarkDead
  *		Set the item identifier to be DEAD, keeping its existing storage.
  *
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index 173dc16..d602b24 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -359,6 +359,13 @@ typedef struct StdRdOptions
 	((relation)->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
 
 /*
+ * RelationGetVacuumGen
+ * 		Get vacuum generation number for the relation
+ */
+#define RelationGetLastVacGen(relation) \
+	((relation)->rd_rel->rellastvacgen)
+
+/*
  * RELATION_IS_LOCAL
  *		If a rel is either temp or newly created in the current transaction,
  *		it can be assumed to be visible only to the current backend.
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to