From ddb820f9471f9c5f04aee98066f7f9cd8c3278d2 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Fri, 28 Sep 2018 15:38:05 +0900
Subject: [PATCH v2] Add FREEZE_ONLY option to VACUUM command.

---
 doc/src/sgml/ref/vacuum.sgml         | 19 ++++++++
 src/backend/access/heap/pruneheap.c  | 49 +++++++++++++++++----
 src/backend/commands/vacuum.c        |  9 ++--
 src/backend/commands/vacuumlazy.c    | 84 +++++++++++++++++++++++++++---------
 src/backend/parser/gram.y            |  2 +
 src/include/access/heapam.h          |  3 +-
 src/include/nodes/parsenodes.h       |  3 +-
 src/test/regress/expected/vacuum.out |  1 +
 src/test/regress/sql/vacuum.sql      |  1 +
 9 files changed, 137 insertions(+), 34 deletions(-)

diff --git a/doc/src/sgml/ref/vacuum.sgml b/doc/src/sgml/ref/vacuum.sgml
index fd911f5..f2ab450 100644
--- a/doc/src/sgml/ref/vacuum.sgml
+++ b/doc/src/sgml/ref/vacuum.sgml
@@ -28,6 +28,7 @@ VACUUM [ FULL ] [ FREEZE ] [ VERBOSE ] [ ANALYZE ] [ <replaceable class="paramet
 
     FULL
     FREEZE
+    FREEZE_ONLY
     VERBOSE
     ANALYZE
     DISABLE_PAGE_SKIPPING
@@ -124,6 +125,24 @@ VACUUM [ FULL ] [ FREEZE ] [ VERBOSE ] [ ANALYZE ] [ <replaceable class="paramet
    </varlistentry>
 
    <varlistentry>
+    <term><literal>FREEZE_ONLY</literal></term>
+    <listitem>
+     <para>
+      <command>VACUUM</command> removes dead tuples and prunes HOT-updated
+      tuples chain for live tuples. If the table has any dead tuple it removes
+      them on both table and its indexes.
+      This option is same as <literal>FREEZE</literal> except for it disables
+      removing dead tuples, so it does only freezing tuples and pruning
+      HOT-updated tuple chains. This is intended to be used when necessary to
+      prevent transaction ID wraparound as quick as possible.
+      Aggressive freezing is always performed when the
+      table is rewritten, so this option is redundant when <literal>FULL</literal>
+      is specified.
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
     <term><literal>VERBOSE</literal></term>
     <listitem>
      <para>
diff --git a/src/backend/access/heap/pruneheap.c b/src/backend/access/heap/pruneheap.c
index c2f5343..867d840 100644
--- a/src/backend/access/heap/pruneheap.c
+++ b/src/backend/access/heap/pruneheap.c
@@ -35,6 +35,7 @@ typedef struct
 	int			nredirected;	/* numbers of entries in arrays below */
 	int			ndead;
 	int			nunused;
+	bool		prune_root;		/* do we prune whole chain or root item? */
 	/* arrays that accumulate indexes of items to be changed */
 	OffsetNumber redirected[MaxHeapTuplesPerPage * 2];
 	OffsetNumber nowdead[MaxHeapTuplesPerPage];
@@ -53,6 +54,8 @@ static void heap_prune_record_redirect(PruneState *prstate,
 						   OffsetNumber offnum, OffsetNumber rdoffnum);
 static void heap_prune_record_dead(PruneState *prstate, OffsetNumber offnum);
 static void heap_prune_record_unused(PruneState *prstate, OffsetNumber offnum);
+static void heap_prune_advance_latest_removed_xid(PruneState *prstate, Page page,
+												  OffsetNumber offnum);
 
 
 /*
@@ -152,7 +155,8 @@ heap_page_prune_opt(Relation relation, Buffer buffer)
 															 * needed */
 
 			/* OK to prune */
-			(void) heap_page_prune(relation, buffer, OldestXmin, true, &ignore);
+			(void) heap_page_prune(relation, buffer, OldestXmin, true, &ignore,
+								   true);
 		}
 
 		/* And release buffer lock */
@@ -179,7 +183,8 @@ heap_page_prune_opt(Relation relation, Buffer buffer)
  */
 int
 heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
-				bool report_stats, TransactionId *latestRemovedXid)
+				bool report_stats, TransactionId *latestRemovedXid,
+				bool prune_root)
 {
 	int			ndeleted = 0;
 	Page		page = BufferGetPage(buffer);
@@ -201,6 +206,7 @@ heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
 	prstate.new_prune_xid = InvalidTransactionId;
 	prstate.latestRemovedXid = *latestRemovedXid;
 	prstate.nredirected = prstate.ndead = prstate.nunused = 0;
+	prstate.prune_root = prune_root;
 	memset(prstate.marked, 0, sizeof(prstate.marked));
 
 	/* Scan the page */
@@ -537,11 +543,7 @@ heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rootoffnum,
 		 * find another DEAD tuple is a fairly unusual corner case.)
 		 */
 		if (tupdead)
-		{
 			latestdead = offnum;
-			HeapTupleHeaderAdvanceLatestRemovedXid(htup,
-												   &prstate->latestRemovedXid);
-		}
 		else if (!recent_dead)
 			break;
 
@@ -580,6 +582,7 @@ heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rootoffnum,
 		 */
 		for (i = 1; (i < nchain) && (chainitems[i - 1] != latestdead); i++)
 		{
+			heap_prune_advance_latest_removed_xid(prstate, dp, chainitems[i]);
 			heap_prune_record_unused(prstate, chainitems[i]);
 			ndeleted++;
 		}
@@ -598,7 +601,13 @@ heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rootoffnum,
 		 * redirect the root to the correct chain member.
 		 */
 		if (i >= nchain)
-			heap_prune_record_dead(prstate, rootoffnum);
+		{
+			if (prstate->prune_root)
+			{
+				heap_prune_advance_latest_removed_xid(prstate, dp, rootoffnum);
+				heap_prune_record_dead(prstate, rootoffnum);
+			}
+		}
 		else
 			heap_prune_record_redirect(prstate, rootoffnum, chainitems[i]);
 	}
@@ -611,12 +620,36 @@ heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rootoffnum,
 		 * redirect item.  We can clean up by setting the redirect item to
 		 * DEAD state.
 		 */
-		heap_prune_record_dead(prstate, rootoffnum);
+		if (prstate->prune_root)
+			heap_prune_record_dead(prstate, rootoffnum);
 	}
 
 	return ndeleted;
 }
 
+/*
+ * Advance latestRemovedXid using by the given page and offset. This
+ * function is used for the deleted tuples in the HOT chain. Since the
+ * root line pointer can be redirected we skip it.
+ */
+static void
+heap_prune_advance_latest_removed_xid(PruneState *prstate, Page page,
+									  OffsetNumber offnum)
+{
+	ItemId		lp;
+	HeapTupleHeader	htup;
+
+	lp = PageGetItemId(page, offnum);
+	Assert(ItemIdIsNormal(lp) || ItemIdIsRedirected(lp));
+
+	if (ItemIdIsRedirected(lp))
+		return;
+
+	htup = (HeapTupleHeader) PageGetItem(page, lp);
+	HeapTupleHeaderAdvanceLatestRemovedXid(htup,
+										   &prstate->latestRemovedXid);
+}
+
 /* Record lowest soon-prunable XID */
 static void
 heap_prune_record_prunable(PruneState *prstate, TransactionId xid)
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index a86963f..6f36fbc 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -91,7 +91,7 @@ ExecVacuum(VacuumStmt *vacstmt, bool isTopLevel)
 	/* sanity checks on options */
 	Assert(vacstmt->options & (VACOPT_VACUUM | VACOPT_ANALYZE));
 	Assert((vacstmt->options & VACOPT_VACUUM) ||
-		   !(vacstmt->options & (VACOPT_FULL | VACOPT_FREEZE)));
+		   !(vacstmt->options & (VACOPT_FULL | VACOPT_FREEZE | VACOPT_FREEZE_ONLY)));
 	Assert(!(vacstmt->options & VACOPT_SKIPTOAST));
 
 	/*
@@ -113,10 +113,11 @@ ExecVacuum(VacuumStmt *vacstmt, bool isTopLevel)
 	}
 
 	/*
-	 * All freeze ages are zero if the FREEZE option is given; otherwise pass
-	 * them as -1 which means to use the default values.
+	 * All freeze ages are zero if either the FREEZE option or the FREEZE_ONLY
+	 * option is given; otherwise pass them as -1 which means to use the default
+	 * values.
 	 */
-	if (vacstmt->options & VACOPT_FREEZE)
+	if (vacstmt->options & (VACOPT_FREEZE | VACOPT_FREEZE_ONLY))
 	{
 		params.freeze_min_age = 0;
 		params.freeze_table_age = 0;
diff --git a/src/backend/commands/vacuumlazy.c b/src/backend/commands/vacuumlazy.c
index 8996d36..b1764ba 100644
--- a/src/backend/commands/vacuumlazy.c
+++ b/src/backend/commands/vacuumlazy.c
@@ -168,7 +168,8 @@ static bool should_attempt_truncation(LVRelStats *vacrelstats);
 static void lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats);
 static BlockNumber count_nondeletable_pages(Relation onerel,
 						 LVRelStats *vacrelstats);
-static void lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks);
+static void lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks,
+							 bool collect_deadtuples);
 static void lazy_record_dead_tuple(LVRelStats *vacrelstats,
 					   ItemPointer itemptr);
 static bool lazy_tid_reaped(ItemPointer itemptr, void *state);
@@ -286,7 +287,7 @@ lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params,
 	/*
 	 * Optionally truncate the relation.
 	 */
-	if (should_attempt_truncation(vacrelstats))
+	if (should_attempt_truncation(vacrelstats) && (options & VACOPT_FREEZE_ONLY) == 0)
 		lazy_truncate_heap(onerel, vacrelstats);
 
 	/* Report that we are now doing final cleanup */
@@ -468,6 +469,11 @@ vacuum_log_cleanup_info(Relation rel, LVRelStats *vacrelstats)
  *		If there are no indexes then we can reclaim line pointers on the fly;
  *		dead line pointers need only be retained until all index pointers that
  *		reference them have been killed.
+ *
+ *		If FREEE_ONLY option is specified, we do only freezing live tuples,
+ *		HOT pruning and maintaining both the freespace map and the visibility
+ *		map, but don't collect dead tuple pointers and invoke both vacuuming
+ *		of indexes and heap.
  */
 static void
 lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
@@ -495,6 +501,8 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 	bool		skipping_blocks;
 	xl_heap_freeze_tuple *frozen;
 	StringInfoData buf;
+	bool		freeze_only = (options & VACOPT_FREEZE_ONLY) != 0;
+	int			total_nfrozen = 0;
 	const int	initprog_index[] = {
 		PROGRESS_VACUUM_PHASE,
 		PROGRESS_VACUUM_TOTAL_HEAP_BLKS,
@@ -530,7 +538,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 	vacrelstats->nonempty_pages = 0;
 	vacrelstats->latestRemovedXid = InvalidTransactionId;
 
-	lazy_space_alloc(vacrelstats, nblocks);
+	lazy_space_alloc(vacrelstats, nblocks, !freeze_only);
 	frozen = palloc(sizeof(xl_heap_freeze_tuple) * MaxHeapTuplesPerPage);
 
 	/* Report that we're scanning the heap, advertising total # of blocks */
@@ -723,6 +731,8 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 			};
 			int64		hvp_val[2];
 
+			Assert(!freeze_only);
+
 			/*
 			 * Before beginning index vacuuming, we release any pin we may
 			 * hold on the visibility map page.  This isn't necessary for
@@ -946,7 +956,8 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 		 * We count tuples removed by the pruning step as removed by VACUUM.
 		 */
 		tups_vacuumed += heap_page_prune(onerel, buf, OldestXmin, false,
-										 &vacrelstats->latestRemovedXid);
+										 &vacrelstats->latestRemovedXid,
+										 !freeze_only);
 
 		/*
 		 * Now scan the page to collect vacuumable items and check for tuples
@@ -995,7 +1006,8 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 			 */
 			if (ItemIdIsDead(itemid))
 			{
-				lazy_record_dead_tuple(vacrelstats, &(tuple.t_self));
+				if (!freeze_only)
+					lazy_record_dead_tuple(vacrelstats, &(tuple.t_self));
 				all_visible = false;
 				continue;
 			}
@@ -1024,11 +1036,11 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 				case HEAPTUPLE_DEAD:
 
 					/*
-					 * Ordinarily, DEAD tuples would have been removed by
-					 * heap_page_prune(), but it's possible that the tuple
-					 * state changed since heap_page_prune() looked.  In
-					 * particular an INSERT_IN_PROGRESS tuple could have
-					 * changed to DEAD if the inserter aborted.  So this
+					 * If reclaiming tuples is enabled, DEAD tuples would have
+					 * been removed by heap_page_prune(), but it's possible
+					 * that the tuple state changed since heap_page_prune()
+					 * looked. In particular an INSERT_IN_PROGRESS tuple could
+					 * have changed to DEAD if the inserter aborted.  So this
 					 * cannot be considered an error condition.
 					 *
 					 * If the tuple is HOT-updated then it must only be
@@ -1045,6 +1057,8 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 					 * to detect that case and abort the transaction,
 					 * preventing corruption.
 					 */
+					if (freeze_only)
+						tupgone = true;
 					if (HeapTupleIsHotUpdated(&tuple) ||
 						HeapTupleIsHeapOnly(&tuple))
 						nkeep += 1;
@@ -1141,10 +1155,13 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 
 			if (tupgone)
 			{
-				lazy_record_dead_tuple(vacrelstats, &(tuple.t_self));
-				HeapTupleHeaderAdvanceLatestRemovedXid(tuple.t_data,
-													   &vacrelstats->latestRemovedXid);
-				tups_vacuumed += 1;
+				if (!freeze_only)
+				{
+					lazy_record_dead_tuple(vacrelstats, &(tuple.t_self));
+					HeapTupleHeaderAdvanceLatestRemovedXid(tuple.t_data,
+														   &vacrelstats->latestRemovedXid);
+					tups_vacuumed += 1;
+				}
 				has_dead_tuples = true;
 			}
 			else
@@ -1204,6 +1221,9 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 			}
 
 			END_CRIT_SECTION();
+
+			/* Remember nfrozen in total */
+			total_nfrozen += nfrozen;
 		}
 
 		/*
@@ -1213,6 +1233,8 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 		if (nindexes == 0 &&
 			vacrelstats->num_dead_tuples > 0)
 		{
+			Assert(!freeze_only);
+
 			/* Remove tuples from heap */
 			lazy_vacuum_page(onerel, blkno, buf, 0, vacrelstats, &vmbuffer);
 			has_dead_tuples = false;
@@ -1380,6 +1402,8 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 		};
 		int64		hvp_val[2];
 
+		Assert(!freeze_only);
+
 		/* Log cleanup info before we touch indexes */
 		vacuum_log_cleanup_info(onerel, vacrelstats);
 
@@ -1418,8 +1442,11 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 								 PROGRESS_VACUUM_PHASE_INDEX_CLEANUP);
 
 	/* Do post-vacuum cleanup and statistics update for each index */
-	for (i = 0; i < nindexes; i++)
-		lazy_cleanup_index(Irel[i], indstats[i], vacrelstats);
+	if (!freeze_only)
+	{
+		for (i = 0; i < nindexes; i++)
+			lazy_cleanup_index(Irel[i], indstats[i], vacrelstats);
+	}
 
 	/* If no indexes, make log report that lazy_vacuum_heap would've made */
 	if (vacuumed_pages)
@@ -1433,9 +1460,16 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 	 * individual parts of the message when not applicable.
 	 */
 	initStringInfo(&buf);
-	appendStringInfo(&buf,
-					 _("%.0f dead row versions cannot be removed yet, oldest xmin: %u\n"),
-					 nkeep, OldestXmin);
+	if (!freeze_only)
+		appendStringInfo(&buf,
+						 _("%.0f dead row versions cannot be removed yet, oldest xmin: %u\n"),
+						 nkeep, OldestXmin);
+	else
+		appendStringInfo(&buf, ngettext("freeze %d row\n",
+										"freeze %d rows\n",
+										total_nfrozen),
+						 total_nfrozen);
+
 	appendStringInfo(&buf, _("There were %.0f unused item pointers.\n"),
 					 nunused);
 	appendStringInfo(&buf, ngettext("Skipped %u page due to buffer pins, ",
@@ -2085,13 +2119,23 @@ count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats)
  * See the comments at the head of this file for rationale.
  */
 static void
-lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks)
+lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks,
+				 bool collect_deadtuples)
 {
 	long		maxtuples;
 	int			vac_work_mem = IsAutoVacuumWorkerProcess() &&
 	autovacuum_work_mem != -1 ?
 	autovacuum_work_mem : maintenance_work_mem;
 
+	/* Don't prepare dead tuple space if we don't collect them */
+	if (!collect_deadtuples)
+	{
+		vacrelstats->num_dead_tuples = 0;
+		vacrelstats->max_dead_tuples = 0;
+		vacrelstats->dead_tuples = NULL;
+		return;
+	}
+
 	if (vacrelstats->hasindex)
 	{
 		maxtuples = (vac_work_mem * 1024L) / sizeof(ItemPointerData);
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 6d23bfb..9866b0b 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -10540,6 +10540,8 @@ vacuum_option_elem:
 						$$ = VACOPT_DISABLE_PAGE_SKIPPING;
 					else if (strcmp($1, "skip_locked") == 0)
 						$$ = VACOPT_SKIP_LOCKED;
+					else if (strcmp($1, "freeze_only") == 0)
+						$$ = VACOPT_FREEZE_ONLY;
 					else
 						ereport(ERROR,
 								(errcode(ERRCODE_SYNTAX_ERROR),
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 40e153f..423eeb1 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -188,7 +188,8 @@ extern void heap_update_snapshot(HeapScanDesc scan, Snapshot snapshot);
 extern void heap_page_prune_opt(Relation relation, Buffer buffer);
 extern int heap_page_prune(Relation relation, Buffer buffer,
 				TransactionId OldestXmin,
-				bool report_stats, TransactionId *latestRemovedXid);
+				bool report_stats, TransactionId *latestRemovedXid,
+				bool prune_root);
 extern void heap_page_prune_execute(Buffer buffer,
 						OffsetNumber *redirected, int nredirected,
 						OffsetNumber *nowdead, int ndead,
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index aa4a0db..478c82c 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3156,7 +3156,8 @@ typedef enum VacuumOption
 	VACOPT_FULL = 1 << 4,		/* FULL (non-concurrent) vacuum */
 	VACOPT_SKIP_LOCKED = 1 << 5,	/* skip if cannot get lock */
 	VACOPT_SKIPTOAST = 1 << 6,	/* don't process the TOAST table, if any */
-	VACOPT_DISABLE_PAGE_SKIPPING = 1 << 7	/* don't skip any pages */
+	VACOPT_DISABLE_PAGE_SKIPPING = 1 << 7,	/* don't skip any pages */
+	VACOPT_FREEZE_ONLY = 1 << 8	/* do only freezing */
 } VacuumOption;
 
 /*
diff --git a/src/test/regress/expected/vacuum.out b/src/test/regress/expected/vacuum.out
index fa9d663..5553a69 100644
--- a/src/test/regress/expected/vacuum.out
+++ b/src/test/regress/expected/vacuum.out
@@ -80,6 +80,7 @@ CONTEXT:  SQL function "do_analyze" statement 1
 SQL function "wrap_do_analyze" statement 1
 VACUUM FULL vactst;
 VACUUM (DISABLE_PAGE_SKIPPING) vaccluster;
+VACUUM (FREEZE_ONLY) vaccluster, vactst;
 -- partitioned table
 CREATE TABLE vacparted (a int, b char) PARTITION BY LIST (a);
 CREATE TABLE vacparted1 PARTITION OF vacparted FOR VALUES IN (1);
diff --git a/src/test/regress/sql/vacuum.sql b/src/test/regress/sql/vacuum.sql
index 9defa0d..0e8805e 100644
--- a/src/test/regress/sql/vacuum.sql
+++ b/src/test/regress/sql/vacuum.sql
@@ -61,6 +61,7 @@ VACUUM FULL vaccluster;
 VACUUM FULL vactst;
 
 VACUUM (DISABLE_PAGE_SKIPPING) vaccluster;
+VACUUM (FREEZE_ONLY) vaccluster, vactst;
 
 -- partitioned table
 CREATE TABLE vacparted (a int, b char) PARTITION BY LIST (a);
-- 
2.10.5

