From 0b202c41a189ff23bb2a73cce9b5c59fa9490b38 Mon Sep 17 00:00:00 2001
From: Mark Dilger <mark.dilger@enterprisedb.com>
Date: Wed, 24 Mar 2021 18:18:56 -0700
Subject: [PATCH v12] Fix visibility and locking issues.

Fix amcheck's verify_heapam() visibility rules and refactor toast
checking code to be performed after releasing the buffer lock on the
main table page.
---
 contrib/amcheck/verify_heapam.c  | 1123 ++++++++++++++++++------------
 src/tools/pgindent/typedefs.list |    1 +
 2 files changed, 693 insertions(+), 431 deletions(-)

diff --git a/contrib/amcheck/verify_heapam.c b/contrib/amcheck/verify_heapam.c
index 6f972e630a..e3163e4bfa 100644
--- a/contrib/amcheck/verify_heapam.c
+++ b/contrib/amcheck/verify_heapam.c
@@ -46,6 +46,7 @@ typedef enum XidBoundsViolation
 typedef enum XidCommitStatus
 {
 	XID_COMMITTED,
+	XID_IS_CURRENT_XID,
 	XID_IN_PROGRESS,
 	XID_ABORTED
 } XidCommitStatus;
@@ -57,6 +58,26 @@ typedef enum SkipPages
 	SKIP_PAGES_NONE
 } SkipPages;
 
+/*
+ * Struct holding information necessary to check a toasted attribute, including
+ * the toast pointer, state about the current toast chunk being checked, and
+ * the location in the main table of the toasted attribute.  We have to track
+ * the tuple's location in the main table for reporting purposes because by the
+ * time the toast is checked our HeapCheckContext will no longer be pointing to
+ * the relevant tuple.
+ */
+typedef struct ToastCheckContext
+{
+	struct varatt_external toast_pointer;
+	BlockNumber blkno;			/* block in main table */
+	OffsetNumber offnum;		/* offset in main table */
+	AttrNumber	attnum;			/* attribute in main table */
+	int32		chunkno;		/* chunk number in toast table */
+	int32		attrsize;		/* size of toasted attribute */
+	int32		endchunk;		/* last chunk number in toast table */
+	int32		totalchunks;	/* total chunks in toast table */
+} ToastCheckContext;
+
 /*
  * Struct holding the running context information during
  * a lifetime of a verify_heapam execution.
@@ -72,6 +93,8 @@ typedef struct HeapCheckContext
 	TransactionId oldest_xid;	/* ShmemVariableCache->oldestXid */
 	FullTransactionId oldest_fxid;	/* 64-bit version of oldest_xid, computed
 									 * relative to next_fxid */
+	TransactionId safe_xmin;	/* this XID and newer ones can't become
+								 * all-visible while we're running */
 
 	/*
 	 * Cached copy of value from MultiXactState
@@ -113,11 +136,14 @@ typedef struct HeapCheckContext
 	uint32		offset;			/* offset in tuple data */
 	AttrNumber	attnum;
 
-	/* Values for iterating over toast for the attribute */
-	int32		chunkno;
-	int32		attrsize;
-	int32		endchunk;
-	int32		totalchunks;
+	/* True if toast for this tuple could be vacuumed away */
+	bool		tuple_is_volatile;
+
+	/*
+	 * List of ToastCheckContext structs for toasted attributes which are not
+	 * in danger of being vacuumed way and should be checked
+	 */
+	List	   *toasted_attributes;
 
 	/* Whether verify_heapam has yet encountered any corrupt tuples */
 	bool		is_corrupt;
@@ -130,13 +156,18 @@ typedef struct HeapCheckContext
 /* Internal implementation */
 static void sanity_check_relation(Relation rel);
 static void check_tuple(HeapCheckContext *ctx);
-static void check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx);
+static void check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx,
+							  ToastCheckContext *tctx);
+
+static bool check_tuple_header(HeapCheckContext *ctx);
+static bool check_tuple_visibility(HeapCheckContext *ctx);
 
 static bool check_tuple_attribute(HeapCheckContext *ctx);
-static bool check_tuple_header_and_visibilty(HeapTupleHeader tuphdr,
-											 HeapCheckContext *ctx);
+static void check_toasted_attributes(HeapCheckContext *ctx);
 
-static void report_corruption(HeapCheckContext *ctx, char *msg);
+static void report_main_corruption(HeapCheckContext *ctx, char *msg);
+static void report_toast_corruption(HeapCheckContext *ctx,
+									ToastCheckContext *tctx, char *msg);
 static TupleDesc verify_heapam_tupdesc(void);
 static FullTransactionId FullTransactionIdFromXidAndCtx(TransactionId xid,
 														const HeapCheckContext *ctx);
@@ -247,6 +278,13 @@ verify_heapam(PG_FUNCTION_ARGS)
 
 	memset(&ctx, 0, sizeof(HeapCheckContext));
 	ctx.cached_xid = InvalidTransactionId;
+	ctx.toasted_attributes = NIL;
+
+	/*
+	 * Any xmin newer than the xmin of our snapshot can't become all-visible
+	 * while we're running.
+	 */
+	ctx.safe_xmin = GetTransactionSnapshot()->xmin;
 
 	/*
 	 * If we report corruption when not examining some individual attribute,
@@ -395,25 +433,25 @@ verify_heapam(PG_FUNCTION_ARGS)
 
 				if (rdoffnum < FirstOffsetNumber)
 				{
-					report_corruption(&ctx,
-									  psprintf("line pointer redirection to item at offset %u precedes minimum offset %u",
-											   (unsigned) rdoffnum,
-											   (unsigned) FirstOffsetNumber));
+					report_main_corruption(&ctx,
+										   psprintf("line pointer redirection to item at offset %u precedes minimum offset %u",
+													(unsigned) rdoffnum,
+													(unsigned) FirstOffsetNumber));
 					continue;
 				}
 				if (rdoffnum > maxoff)
 				{
-					report_corruption(&ctx,
-									  psprintf("line pointer redirection to item at offset %u exceeds maximum offset %u",
-											   (unsigned) rdoffnum,
-											   (unsigned) maxoff));
+					report_main_corruption(&ctx,
+										   psprintf("line pointer redirection to item at offset %u exceeds maximum offset %u",
+													(unsigned) rdoffnum,
+													(unsigned) maxoff));
 					continue;
 				}
 				rditem = PageGetItemId(ctx.page, rdoffnum);
 				if (!ItemIdIsUsed(rditem))
-					report_corruption(&ctx,
-									  psprintf("line pointer redirection to unused item at offset %u",
-											   (unsigned) rdoffnum));
+					report_main_corruption(&ctx,
+										   psprintf("line pointer redirection to unused item at offset %u",
+													(unsigned) rdoffnum));
 				continue;
 			}
 
@@ -423,26 +461,26 @@ verify_heapam(PG_FUNCTION_ARGS)
 
 			if (ctx.lp_off != MAXALIGN(ctx.lp_off))
 			{
-				report_corruption(&ctx,
-								  psprintf("line pointer to page offset %u is not maximally aligned",
-										   ctx.lp_off));
+				report_main_corruption(&ctx,
+									   psprintf("line pointer to page offset %u is not maximally aligned",
+												ctx.lp_off));
 				continue;
 			}
 			if (ctx.lp_len < MAXALIGN(SizeofHeapTupleHeader))
 			{
-				report_corruption(&ctx,
-								  psprintf("line pointer length %u is less than the minimum tuple header size %u",
-										   ctx.lp_len,
-										   (unsigned) MAXALIGN(SizeofHeapTupleHeader)));
+				report_main_corruption(&ctx,
+									   psprintf("line pointer length %u is less than the minimum tuple header size %u",
+												ctx.lp_len,
+												(unsigned) MAXALIGN(SizeofHeapTupleHeader)));
 				continue;
 			}
 			if (ctx.lp_off + ctx.lp_len > BLCKSZ)
 			{
-				report_corruption(&ctx,
-								  psprintf("line pointer to page offset %u with length %u ends beyond maximum page offset %u",
-										   ctx.lp_off,
-										   ctx.lp_len,
-										   (unsigned) BLCKSZ));
+				report_main_corruption(&ctx,
+									   psprintf("line pointer to page offset %u with length %u ends beyond maximum page offset %u",
+												ctx.lp_off,
+												ctx.lp_len,
+												(unsigned) BLCKSZ));
 				continue;
 			}
 
@@ -457,6 +495,14 @@ verify_heapam(PG_FUNCTION_ARGS)
 		/* clean up */
 		UnlockReleaseBuffer(ctx.buffer);
 
+		/*
+		 * Check any toast pointers from the page whose lock we just released
+		 * and reset the list to NIL.
+		 */
+		if (ctx.toasted_attributes != NIL)
+			check_toasted_attributes(&ctx);
+		Assert(ctx.toasted_attributes == NIL);
+
 		if (on_error_stop && ctx.is_corrupt)
 			break;
 	}
@@ -498,14 +544,13 @@ sanity_check_relation(Relation rel)
 }
 
 /*
- * Record a single corruption found in the table.  The values in ctx should
- * reflect the location of the corruption, and the msg argument should contain
- * a human-readable description of the corruption.
- *
- * The msg argument is pfree'd by this function.
+ * Shared internal implementation for report_main_corruption and
+ * report_toast_corruption.
  */
 static void
-report_corruption(HeapCheckContext *ctx, char *msg)
+report_corruption(Tuplestorestate *tupstore, TupleDesc tupdesc,
+				  BlockNumber blkno, OffsetNumber offnum, AttrNumber attnum,
+				  char *msg)
 {
 	Datum		values[HEAPCHECK_RELATION_COLS];
 	bool		nulls[HEAPCHECK_RELATION_COLS];
@@ -513,10 +558,10 @@ report_corruption(HeapCheckContext *ctx, char *msg)
 
 	MemSet(values, 0, sizeof(values));
 	MemSet(nulls, 0, sizeof(nulls));
-	values[0] = Int64GetDatum(ctx->blkno);
-	values[1] = Int32GetDatum(ctx->offnum);
-	values[2] = Int32GetDatum(ctx->attnum);
-	nulls[2] = (ctx->attnum < 0);
+	values[0] = Int64GetDatum(blkno);
+	values[1] = Int32GetDatum(offnum);
+	values[2] = Int32GetDatum(attnum);
+	nulls[2] = (attnum < 0);
 	values[3] = CStringGetTextDatum(msg);
 
 	/*
@@ -529,8 +574,39 @@ report_corruption(HeapCheckContext *ctx, char *msg)
 	 */
 	pfree(msg);
 
-	tuple = heap_form_tuple(ctx->tupdesc, values, nulls);
-	tuplestore_puttuple(ctx->tupstore, tuple);
+	tuple = heap_form_tuple(tupdesc, values, nulls);
+	tuplestore_puttuple(tupstore, tuple);
+}
+
+/*
+ * Record a single corruption found in the main table.  The values in ctx should
+ * indicate the location of the corruption, and the msg argument should contain
+ * a human-readable description of the corruption.
+ *
+ * The msg argument is pfree'd by this function.
+ */
+static void
+report_main_corruption(HeapCheckContext *ctx, char *msg)
+{
+	report_corruption(ctx->tupstore, ctx->tupdesc, ctx->blkno, ctx->offnum,
+					  ctx->attnum, msg);
+	ctx->is_corrupt = true;
+}
+
+/*
+ * Record corruption found in the toast table.  The values in tctx should
+ * indicate the location in the main table where the toast pointer was
+ * encountered, and the msg argument should contain a human-readable
+ * description of the toast table corruption.
+ *
+ * As above, the msg argument is pfree'd by this function.
+ */
+static void
+report_toast_corruption(HeapCheckContext *ctx, ToastCheckContext *tctx,
+						char *msg)
+{
+	report_corruption(ctx->tupstore, ctx->tupdesc, tctx->blkno, tctx->offnum,
+					  tctx->attnum, msg);
 	ctx->is_corrupt = true;
 }
 
@@ -555,16 +631,11 @@ verify_heapam_tupdesc(void)
 }
 
 /*
- * Check for tuple header corruption and tuple visibility.
- *
- * Since we do not hold a snapshot, tuple visibility is not a question of
- * whether we should be able to see the tuple relative to any particular
- * snapshot, but rather a question of whether it is safe and reasonable to
- * check the tuple attributes.
+ * Check for tuple header corruption.
  *
  * Some kinds of corruption make it unsafe to check the tuple attributes, for
  * example when the line pointer refers to a range of bytes outside the page.
- * In such cases, we return false (not visible) after recording appropriate
+ * In such cases, we return false (not checkable) after recording appropriate
  * corruption messages.
  *
  * Some other kinds of tuple header corruption confuse the question of where
@@ -576,44 +647,33 @@ verify_heapam_tupdesc(void)
  *
  * Other kinds of tuple header corruption do not bear on the question of
  * whether the tuple attributes can be checked, so we record corruption
- * messages for them but do not base our visibility determination on them.  (In
- * other words, we do not return false merely because we detected them.)
- *
- * For visibility determination not specifically related to corruption, what we
- * want to know is if a tuple is potentially visible to any running
- * transaction.  If you are tempted to replace this function's visibility logic
- * with a call to another visibility checking function, keep in mind that this
- * function does not update hint bits, as it seems imprudent to write hint bits
- * (or anything at all) to a table during a corruption check.  Nor does this
- * function bother classifying tuple visibility beyond a boolean visible vs.
- * not visible.
- *
- * The caller should already have checked that xmin and xmax are not out of
- * bounds for the relation.
+ * messages for them but we do not return false merely because we detected
+ * them.
  *
- * Returns whether the tuple is both visible and sufficiently sensible to
- * undergo attribute checks.
+ * Returns whether the tuple is sufficiently sensible to undergo visibility and
+ * attribute checks.
  */
 static bool
-check_tuple_header_and_visibilty(HeapTupleHeader tuphdr, HeapCheckContext *ctx)
+check_tuple_header(HeapCheckContext *ctx)
 {
+	HeapTupleHeader tuphdr = ctx->tuphdr;
 	uint16		infomask = tuphdr->t_infomask;
 	bool		header_garbled = false;
 	unsigned	expected_hoff;
 
 	if (ctx->tuphdr->t_hoff > ctx->lp_len)
 	{
-		report_corruption(ctx,
-						  psprintf("data begins at offset %u beyond the tuple length %u",
-								   ctx->tuphdr->t_hoff, ctx->lp_len));
+		report_main_corruption(ctx,
+							   psprintf("data begins at offset %u beyond the tuple length %u",
+										ctx->tuphdr->t_hoff, ctx->lp_len));
 		header_garbled = true;
 	}
 
 	if ((ctx->tuphdr->t_infomask & HEAP_XMAX_COMMITTED) &&
 		(ctx->tuphdr->t_infomask & HEAP_XMAX_IS_MULTI))
 	{
-		report_corruption(ctx,
-						  pstrdup("multixact should not be marked committed"));
+		report_main_corruption(ctx,
+							   pstrdup("multixact should not be marked committed"));
 
 		/*
 		 * This condition is clearly wrong, but we do not consider the header
@@ -630,188 +690,464 @@ check_tuple_header_and_visibilty(HeapTupleHeader tuphdr, HeapCheckContext *ctx)
 	if (ctx->tuphdr->t_hoff != expected_hoff)
 	{
 		if ((infomask & HEAP_HASNULL) && ctx->natts == 1)
-			report_corruption(ctx,
-							  psprintf("tuple data should begin at byte %u, but actually begins at byte %u (1 attribute, has nulls)",
-									   expected_hoff, ctx->tuphdr->t_hoff));
+			report_main_corruption(ctx,
+								   psprintf("tuple data should begin at byte %u, but actually begins at byte %u (1 attribute, has nulls)",
+											expected_hoff, ctx->tuphdr->t_hoff));
 		else if ((infomask & HEAP_HASNULL))
-			report_corruption(ctx,
-							  psprintf("tuple data should begin at byte %u, but actually begins at byte %u (%u attributes, has nulls)",
-									   expected_hoff, ctx->tuphdr->t_hoff, ctx->natts));
+			report_main_corruption(ctx,
+								   psprintf("tuple data should begin at byte %u, but actually begins at byte %u (%u attributes, has nulls)",
+											expected_hoff, ctx->tuphdr->t_hoff, ctx->natts));
 		else if (ctx->natts == 1)
-			report_corruption(ctx,
-							  psprintf("tuple data should begin at byte %u, but actually begins at byte %u (1 attribute, no nulls)",
-									   expected_hoff, ctx->tuphdr->t_hoff));
+			report_main_corruption(ctx,
+								   psprintf("tuple data should begin at byte %u, but actually begins at byte %u (1 attribute, no nulls)",
+											expected_hoff, ctx->tuphdr->t_hoff));
 		else
-			report_corruption(ctx,
-							  psprintf("tuple data should begin at byte %u, but actually begins at byte %u (%u attributes, no nulls)",
-									   expected_hoff, ctx->tuphdr->t_hoff, ctx->natts));
+			report_main_corruption(ctx,
+								   psprintf("tuple data should begin at byte %u, but actually begins at byte %u (%u attributes, no nulls)",
+											expected_hoff, ctx->tuphdr->t_hoff, ctx->natts));
 		header_garbled = true;
 	}
 
 	if (header_garbled)
 		return false;			/* checking of this tuple should not continue */
 
+	return true;				/* header ok */
+}
+
+/*
+ * Checks whether a tuple is visible to our transaction for checking, which is
+ * not a question of whether we should be able to see the tuple relative to any
+ * particular snapshot, but rather a question of whether it is safe and
+ * reasonable to check the tuple attributes.  The caller should already have
+ * checked that the tuple is sufficiently sensible for us to evaluate.
+ *
+ * If a tuple could have been inserted by a transaction that also added a
+ * column to the table, but which ultimately did not commit, or which has not
+ * yet committed, then the table's current TupleDesc might differ from the one
+ * used to construct this tuple, so we must not check it.
+ *
+ * As a special case, if our own transaction inserted the tuple, even if we
+ * added a column to the table, our TupleDesc should match.  We could check the
+ * tuple, but choose not to do so.
+ *
+ * If a tuple has been updated or deleted, we can still read the old tuple for
+ * corruption checking purposes, as long as we are careful about concurrent
+ * vacuums.  The main table tuple itself cannot be vacuumed away because we
+ * hold a buffer lock on the page, but if the deleting transaction is older
+ * than our transaction snapshot's xmin, then vacuum could remove the toast at
+ * any time, so we must not check the toast.
+ *
+ * If xmin or xmax values are older than can be checked against clog, or appear
+ * to be in the future (possibly due to wrap-around), then we cannot make a
+ * determination about the visibility of the tuple, so we must not check it.
+ *
+ * Returns true if the tuple should be checked, false otherwise.  Sets
+ * ctx->toast_is_volatile true if the toast might be vacuumed away, false
+ * otherwise.
+ */
+static bool
+check_tuple_visibility(HeapCheckContext *ctx)
+{
+	TransactionId xmin;
+	TransactionId xvac;
+	TransactionId xmax;
+	XidCommitStatus xmin_status;
+	XidCommitStatus xvac_status;
+	XidCommitStatus xmax_status;
+	HeapTupleHeader tuphdr = ctx->tuphdr;
+
+	ctx->tuple_is_volatile = true;	/* have not yet proven otherwise */
+
+	/* If xmin is normal, it should be within valid range */
+	xmin = HeapTupleHeaderGetXmin(tuphdr);
+	switch (get_xid_status(xmin, ctx, &xmin_status))
+	{
+		case XID_INVALID:
+		case XID_BOUNDS_OK:
+			break;
+		case XID_IN_FUTURE:
+			report_main_corruption(ctx,
+								   psprintf("xmin %u equals or exceeds next valid transaction ID %u:%u",
+											xmin,
+											EpochFromFullTransactionId(ctx->next_fxid),
+											XidFromFullTransactionId(ctx->next_fxid)));
+			return false;		/* corrupt */
+		case XID_PRECEDES_CLUSTERMIN:
+			report_main_corruption(ctx,
+								   psprintf("xmin %u precedes oldest valid transaction ID %u:%u",
+											xmin,
+											EpochFromFullTransactionId(ctx->oldest_fxid),
+											XidFromFullTransactionId(ctx->oldest_fxid)));
+			return false;		/* corrupt */
+		case XID_PRECEDES_RELMIN:
+			report_main_corruption(ctx,
+								   psprintf("xmin %u precedes relation freeze threshold %u:%u",
+											xmin,
+											EpochFromFullTransactionId(ctx->relfrozenfxid),
+											XidFromFullTransactionId(ctx->relfrozenfxid)));
+			return false;		/* corrupt */
+	}
+
 	/*
-	 * Ok, we can examine the header for tuple visibility purposes, though we
-	 * still need to be careful about a few remaining types of header
-	 * corruption.  This logic roughly follows that of
-	 * HeapTupleSatisfiesVacuum.  Where possible the comments indicate which
-	 * HTSV_Result we think that function might return for this tuple.
+	 * Has inserting transaction committed?
 	 */
 	if (!HeapTupleHeaderXminCommitted(tuphdr))
 	{
-		TransactionId raw_xmin = HeapTupleHeaderGetRawXmin(tuphdr);
-
 		if (HeapTupleHeaderXminInvalid(tuphdr))
-			return false;		/* HEAPTUPLE_DEAD */
+
+			/*
+			 * The inserting transaction aborted.  The structure of the tuple
+			 * may not match our relation description, so we cannot check it.
+			 */
+			return false;		/* uncheckable */
 		/* Used by pre-9.0 binary upgrades */
-		else if (infomask & HEAP_MOVED_OFF ||
-				 infomask & HEAP_MOVED_IN)
+		else if (tuphdr->t_infomask & HEAP_MOVED_OFF)
 		{
-			XidCommitStatus status;
-			TransactionId xvac = HeapTupleHeaderGetXvac(tuphdr);
+			xvac = HeapTupleHeaderGetXvac(tuphdr);
 
-			switch (get_xid_status(xvac, ctx, &status))
+			switch (get_xid_status(xvac, ctx, &xvac_status))
 			{
 				case XID_INVALID:
-					report_corruption(ctx,
-									  pstrdup("old-style VACUUM FULL transaction ID is invalid"));
+					report_main_corruption(ctx,
+										   pstrdup("old-style VACUUM FULL transaction ID for moved off tuple is invalid"));
 					return false;	/* corrupt */
 				case XID_IN_FUTURE:
-					report_corruption(ctx,
-									  psprintf("old-style VACUUM FULL transaction ID %u equals or exceeds next valid transaction ID %u:%u",
-											   xvac,
-											   EpochFromFullTransactionId(ctx->next_fxid),
-											   XidFromFullTransactionId(ctx->next_fxid)));
+					report_main_corruption(ctx,
+										   psprintf("old-style VACUUM FULL transaction ID %u for moved off tuple equals or exceeds next valid transaction ID %u:%u",
+													xvac,
+													EpochFromFullTransactionId(ctx->next_fxid),
+													XidFromFullTransactionId(ctx->next_fxid)));
 					return false;	/* corrupt */
 				case XID_PRECEDES_RELMIN:
-					report_corruption(ctx,
-									  psprintf("old-style VACUUM FULL transaction ID %u precedes relation freeze threshold %u:%u",
-											   xvac,
-											   EpochFromFullTransactionId(ctx->relfrozenfxid),
-											   XidFromFullTransactionId(ctx->relfrozenfxid)));
+					report_main_corruption(ctx,
+										   psprintf("old-style VACUUM FULL transaction ID %u for moved off tuple precedes relation freeze threshold %u:%u",
+													xvac,
+													EpochFromFullTransactionId(ctx->relfrozenfxid),
+													XidFromFullTransactionId(ctx->relfrozenfxid)));
 					return false;	/* corrupt */
-					break;
 				case XID_PRECEDES_CLUSTERMIN:
-					report_corruption(ctx,
-									  psprintf("old-style VACUUM FULL transaction ID %u precedes oldest valid transaction ID %u:%u",
-											   xvac,
-											   EpochFromFullTransactionId(ctx->oldest_fxid),
-											   XidFromFullTransactionId(ctx->oldest_fxid)));
+					report_main_corruption(ctx,
+										   psprintf("old-style VACUUM FULL transaction ID %u for moved off tuple precedes oldest valid transaction ID %u:%u",
+													xvac,
+													EpochFromFullTransactionId(ctx->oldest_fxid),
+													XidFromFullTransactionId(ctx->oldest_fxid)));
 					return false;	/* corrupt */
-					break;
 				case XID_BOUNDS_OK:
-					switch (status)
-					{
-						case XID_IN_PROGRESS:
-							return true;	/* HEAPTUPLE_DELETE_IN_PROGRESS */
-						case XID_COMMITTED:
-						case XID_ABORTED:
-							return false;	/* HEAPTUPLE_DEAD */
-					}
+					break;
 			}
-		}
-		else
-		{
-			XidCommitStatus status;
 
-			switch (get_xid_status(raw_xmin, ctx, &status))
+			switch (xvac_status)
 			{
-				case XID_INVALID:
-					report_corruption(ctx,
-									  pstrdup("raw xmin is invalid"));
-					return false;
-				case XID_IN_FUTURE:
-					report_corruption(ctx,
-									  psprintf("raw xmin %u equals or exceeds next valid transaction ID %u:%u",
-											   raw_xmin,
-											   EpochFromFullTransactionId(ctx->next_fxid),
-											   XidFromFullTransactionId(ctx->next_fxid)));
+				case XID_IS_CURRENT_XID:
+					report_main_corruption(ctx,
+										   psprintf("old-style VACUUM FULL transaction ID %u for moved off tuple matches our current transaction ID",
+													xvac));
 					return false;	/* corrupt */
-				case XID_PRECEDES_RELMIN:
-					report_corruption(ctx,
-									  psprintf("raw xmin %u precedes relation freeze threshold %u:%u",
-											   raw_xmin,
-											   EpochFromFullTransactionId(ctx->relfrozenfxid),
-											   XidFromFullTransactionId(ctx->relfrozenfxid)));
-					return false;	/* corrupt */
-				case XID_PRECEDES_CLUSTERMIN:
-					report_corruption(ctx,
-									  psprintf("raw xmin %u precedes oldest valid transaction ID %u:%u",
-											   raw_xmin,
-											   EpochFromFullTransactionId(ctx->oldest_fxid),
-											   XidFromFullTransactionId(ctx->oldest_fxid)));
+				case XID_IN_PROGRESS:
+					report_main_corruption(ctx,
+										   psprintf("old-style VACUUM FULL transaction ID %u for moved off tuple appears to be in progress",
+													xvac));
 					return false;	/* corrupt */
-				case XID_BOUNDS_OK:
-					switch (status)
-					{
-						case XID_COMMITTED:
-							break;
-						case XID_IN_PROGRESS:
-							return true;	/* insert or delete in progress */
-						case XID_ABORTED:
-							return false;	/* HEAPTUPLE_DEAD */
-					}
+
+				case XID_COMMITTED:
+
+					/*
+					 * The VACUUM FULL committed, so this tuple is dead and
+					 * could be vacuumed away at any time.  It's ok to check
+					 * the tuple because we have a buffer lock for the page,
+					 * but not safe to check the toast.  We don't bother
+					 * comparing against safe_xmin because the VACUUM FULL
+					 * must have committed prior to an upgrade and can't still
+					 * be running.
+					 */
+					return true;	/* checkable */
+
+				case XID_ABORTED:
+					break;
 			}
 		}
-	}
-
-	if (!(infomask & HEAP_XMAX_INVALID) && !HEAP_XMAX_IS_LOCKED_ONLY(infomask))
-	{
-		if (infomask & HEAP_XMAX_IS_MULTI)
+		/* Used by pre-9.0 binary upgrades */
+		else if (tuphdr->t_infomask & HEAP_MOVED_IN)
 		{
-			XidCommitStatus status;
-			TransactionId xmax = HeapTupleGetUpdateXid(tuphdr);
+			xvac = HeapTupleHeaderGetXvac(tuphdr);
 
-			switch (get_xid_status(xmax, ctx, &status))
+			switch (get_xid_status(xvac, ctx, &xvac_status))
 			{
-					/* not LOCKED_ONLY, so it has to have an xmax */
 				case XID_INVALID:
-					report_corruption(ctx,
-									  pstrdup("xmax is invalid"));
+					report_main_corruption(ctx,
+										   pstrdup("old-style VACUUM FULL transaction ID for moved in tuple is invalid"));
 					return false;	/* corrupt */
 				case XID_IN_FUTURE:
-					report_corruption(ctx,
-									  psprintf("xmax %u equals or exceeds next valid transaction ID %u:%u",
-											   xmax,
-											   EpochFromFullTransactionId(ctx->next_fxid),
-											   XidFromFullTransactionId(ctx->next_fxid)));
+					report_main_corruption(ctx,
+										   psprintf("old-style VACUUM FULL transaction ID %u for moved in tuple equals or exceeds next valid transaction ID %u:%u",
+													xvac,
+													EpochFromFullTransactionId(ctx->next_fxid),
+													XidFromFullTransactionId(ctx->next_fxid)));
 					return false;	/* corrupt */
 				case XID_PRECEDES_RELMIN:
-					report_corruption(ctx,
-									  psprintf("xmax %u precedes relation freeze threshold %u:%u",
-											   xmax,
-											   EpochFromFullTransactionId(ctx->relfrozenfxid),
-											   XidFromFullTransactionId(ctx->relfrozenfxid)));
+					report_main_corruption(ctx,
+										   psprintf("old-style VACUUM FULL transaction ID %u for moved in tuple precedes relation freeze threshold %u:%u",
+													xvac,
+													EpochFromFullTransactionId(ctx->relfrozenfxid),
+													XidFromFullTransactionId(ctx->relfrozenfxid)));
 					return false;	/* corrupt */
 				case XID_PRECEDES_CLUSTERMIN:
-					report_corruption(ctx,
-									  psprintf("xmax %u precedes oldest valid transaction ID %u:%u",
-											   xmax,
-											   EpochFromFullTransactionId(ctx->oldest_fxid),
-											   XidFromFullTransactionId(ctx->oldest_fxid)));
+					report_main_corruption(ctx,
+										   psprintf("old-style VACUUM FULL transaction ID %u for moved in tuple precedes oldest valid transaction ID %u:%u",
+													xvac,
+													EpochFromFullTransactionId(ctx->oldest_fxid),
+													XidFromFullTransactionId(ctx->oldest_fxid)));
 					return false;	/* corrupt */
 				case XID_BOUNDS_OK:
-					switch (status)
-					{
-						case XID_IN_PROGRESS:
-							return true;	/* HEAPTUPLE_DELETE_IN_PROGRESS */
-						case XID_COMMITTED:
-						case XID_ABORTED:
-							return false;	/* HEAPTUPLE_RECENTLY_DEAD or
-											 * HEAPTUPLE_DEAD */
-					}
+					break;
 			}
 
-			/* Ok, the tuple is live */
+			switch (xvac_status)
+			{
+				case XID_IS_CURRENT_XID:
+					report_main_corruption(ctx,
+										   psprintf("old-style VACUUM FULL transaction ID %u for moved in tuple matches our current transaction ID",
+													xvac));
+					return false;	/* corrupt */
+				case XID_IN_PROGRESS:
+					report_main_corruption(ctx,
+										   psprintf("old-style VACUUM FULL transaction ID %u for moved in tuple appears to be in progress",
+													xvac));
+					return false;	/* corrupt */
+
+				case XID_COMMITTED:
+					break;
+
+				case XID_ABORTED:
+
+					/*
+					 * The VACUUM FULL aborted, so this tuple is dead and
+					 * could be vacuumed away at any time.  It's ok to check
+					 * the tuple because we have a buffer lock for the page,
+					 * but not safe to check the toast.
+					 */
+					return true;	/* checkable */
+			}
+		}
+		else if (xmin_status == XID_IS_CURRENT_XID)
+		{
+			/*
+			 * Don't check tuples from currently running transactions, not
+			 * even our own.
+			 */
+			return false;		/* checkable, but don't check */
+		}
+		else if (xmin_status == XID_IN_PROGRESS)
+		{
+			/* Don't check tuples from currently running transactions */
+			return false;		/* uncheckable */
+		}
+		else if (xmin_status != XID_COMMITTED)
+		{
+			/*
+			 * Inserting transaction is not in progress, and not committed, so
+			 * it either aborted or crashed. We cannot check.
+			 */
+			return false;		/* uncheckable */
 		}
-		else if (!(infomask & HEAP_XMAX_COMMITTED))
-			return true;		/* HEAPTUPLE_DELETE_IN_PROGRESS or
-								 * HEAPTUPLE_LIVE */
-		else
-			return false;		/* HEAPTUPLE_RECENTLY_DEAD or HEAPTUPLE_DEAD */
 	}
-	return true;				/* not dead */
+
+	/*
+	 * Okay, the inserter committed, so it was good at some point.  Now what
+	 * about the deleting transaction?
+	 */
+
+	if (tuphdr->t_infomask & HEAP_XMAX_IS_MULTI)
+	{
+		/*
+		 * xmax is a multixact, so it should be within valid MXID range.  We
+		 * cannot safely look up the update xid if the multixact is out of
+		 * bounds, and must stop checking this tuple.
+		 */
+		xmax = HeapTupleHeaderGetRawXmax(tuphdr);
+		switch (check_mxid_valid_in_rel(xmax, ctx))
+		{
+			case XID_INVALID:
+				report_main_corruption(ctx,
+									   pstrdup("multitransaction ID is invalid"));
+				return false;	/* corrupt */
+			case XID_PRECEDES_RELMIN:
+				report_main_corruption(ctx,
+									   psprintf("multitransaction ID %u precedes relation minimum multitransaction ID threshold %u",
+												xmax, ctx->relminmxid));
+				return false;	/* corrupt */
+			case XID_PRECEDES_CLUSTERMIN:
+				report_main_corruption(ctx,
+									   psprintf("multitransaction ID %u precedes oldest valid multitransaction ID threshold %u",
+												xmax, ctx->oldest_mxact));
+				return false;	/* corrupt */
+			case XID_IN_FUTURE:
+				report_main_corruption(ctx,
+									   psprintf("multitransaction ID %u equals or exceeds next valid multitransaction ID %u",
+												xmax,
+												ctx->next_mxact));
+				return false;	/* corrupt */
+			case XID_BOUNDS_OK:
+				break;
+		}
+	}
+
+	if (tuphdr->t_infomask & HEAP_XMAX_INVALID)
+	{
+		/*
+		 * This tuple is live.  A concurrently running transaction could
+		 * delete it before we get around to checking the toast, but any such
+		 * running transaction is surely not less than our safe_xmin, so the
+		 * toast cannot be vacuumed out from under us.
+		 */
+		ctx->tuple_is_volatile = false;
+		return true;			/* checkable */
+	}
+
+	if (HEAP_XMAX_IS_LOCKED_ONLY(tuphdr->t_infomask))
+	{
+		/*
+		 * "Deleting" xact really only locked it, so the tuple is live in any
+		 * case.  As above, a concurrently running transaction could delete
+		 * it, but it cannot be vacuumed out from under us.
+		 */
+		ctx->tuple_is_volatile = false;
+		return true;			/* checkable */
+	}
+
+	if (tuphdr->t_infomask & HEAP_XMAX_IS_MULTI)
+	{
+		/*
+		 * We already checked above that this multixact is within limits for
+		 * this table.  Now check the update xid from this multixact.
+		 */
+		xmax = HeapTupleGetUpdateXid(tuphdr);
+		switch (get_xid_status(xmax, ctx, &xmax_status))
+		{
+				/* not LOCKED_ONLY, so it has to have an xmax */
+			case XID_INVALID:
+				report_main_corruption(ctx,
+									   pstrdup("update xid is invalid"));
+				return false;	/* corrupt */
+			case XID_IN_FUTURE:
+				report_main_corruption(ctx,
+									   psprintf("update xid %u equals or exceeds next valid transaction ID %u:%u",
+												xmax,
+												EpochFromFullTransactionId(ctx->next_fxid),
+												XidFromFullTransactionId(ctx->next_fxid)));
+				return false;	/* corrupt */
+			case XID_PRECEDES_RELMIN:
+				report_main_corruption(ctx,
+									   psprintf("update xid %u precedes relation freeze threshold %u:%u",
+												xmax,
+												EpochFromFullTransactionId(ctx->relfrozenfxid),
+												XidFromFullTransactionId(ctx->relfrozenfxid)));
+				return false;	/* corrupt */
+			case XID_PRECEDES_CLUSTERMIN:
+				report_main_corruption(ctx,
+									   psprintf("update xid %u precedes oldest valid transaction ID %u:%u",
+												xmax,
+												EpochFromFullTransactionId(ctx->oldest_fxid),
+												XidFromFullTransactionId(ctx->oldest_fxid)));
+				return false;	/* corrupt */
+			case XID_BOUNDS_OK:
+				break;
+		}
+
+		switch (xmax_status)
+		{
+			case XID_IS_CURRENT_XID:
+			case XID_IN_PROGRESS:
+
+				/*
+				 * The delete is in progress, so it cannot be visible to our
+				 * snapshot.
+				 */
+				ctx->tuple_is_volatile = false;
+				return true;	/* checkable */
+			case XID_COMMITTED:
+
+				/*
+				 * The delete committed.  Whether the toast can be vacuumed
+				 * away depends on how old the deleting transaction is.
+				 */
+				ctx->tuple_is_volatile = TransactionIdPrecedes(xmax,
+															   ctx->safe_xmin);
+				return true;	/* checkable */
+			case XID_ABORTED:
+
+				/*
+				 * The delete aborted or crashed.  The tuple is still live.
+				 */
+				ctx->tuple_is_volatile = false;
+				return true;	/* checkable */
+		}
+	}
+
+	/*
+	 * The tuple is deleted.  Whether the toast can be vacuumed away depends
+	 * on how old the deleting transaction is.
+	 */
+	xmax = HeapTupleHeaderGetRawXmax(tuphdr);
+
+	switch (get_xid_status(xmax, ctx, &xmax_status))
+	{
+		case XID_IN_FUTURE:
+			report_main_corruption(ctx,
+								   psprintf("xmax %u equals or exceeds next valid transaction ID %u:%u",
+											xmax,
+											EpochFromFullTransactionId(ctx->next_fxid),
+											XidFromFullTransactionId(ctx->next_fxid)));
+			return false;		/* corrupt */
+		case XID_PRECEDES_RELMIN:
+			report_main_corruption(ctx,
+								   psprintf("xmax %u precedes relation freeze threshold %u:%u",
+											xmax,
+											EpochFromFullTransactionId(ctx->relfrozenfxid),
+											XidFromFullTransactionId(ctx->relfrozenfxid)));
+			return false;		/* corrupt */
+		case XID_PRECEDES_CLUSTERMIN:
+			report_main_corruption(ctx,
+								   psprintf("xmax %u precedes oldest valid transaction ID %u:%u",
+											xmax,
+											EpochFromFullTransactionId(ctx->oldest_fxid),
+											XidFromFullTransactionId(ctx->oldest_fxid)));
+			return false;		/* corrupt */
+		case XID_BOUNDS_OK:
+		case XID_INVALID:
+			break;
+	}
+
+	switch (xmax_status)
+	{
+		case XID_IS_CURRENT_XID:
+		case XID_IN_PROGRESS:
+
+			/*
+			 * The delete is in progress, so it cannot be visible to our
+			 * snapshot.
+			 */
+			ctx->tuple_is_volatile = false;
+			return true;		/* checkable */
+		case XID_COMMITTED:
+
+			/*
+			 * The delete committed.  Whether the toast can be vacuumed away
+			 * depends on how old the deleting transaction is.
+			 */
+			ctx->tuple_is_volatile = TransactionIdPrecedes(xmax,
+														   ctx->safe_xmin);
+			return true;		/* checkable */
+		case XID_ABORTED:
+
+			/*
+			 * The delete aborted or crashed.  The tuple is still live.
+			 */
+			ctx->tuple_is_volatile = false;
+			return true;		/* checkable */
+	}
+
+	return false;				/* not reached */
 }
 
 /*
@@ -826,7 +1162,8 @@ check_tuple_header_and_visibilty(HeapTupleHeader tuphdr, HeapCheckContext *ctx)
  * as each toast tuple having its varlena structure sanity checked.
  */
 static void
-check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx)
+check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx,
+				  ToastCheckContext *tctx)
 {
 	int32		curchunk;
 	Pointer		chunk;
@@ -841,16 +1178,16 @@ check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx)
 										 ctx->toast_rel->rd_att, &isnull));
 	if (isnull)
 	{
-		report_corruption(ctx,
-						  pstrdup("toast chunk sequence number is null"));
+		report_toast_corruption(ctx, tctx,
+								pstrdup("toast chunk sequence number is null"));
 		return;
 	}
 	chunk = DatumGetPointer(fastgetattr(toasttup, 3,
 										ctx->toast_rel->rd_att, &isnull));
 	if (isnull)
 	{
-		report_corruption(ctx,
-						  pstrdup("toast chunk data is null"));
+		report_toast_corruption(ctx, tctx,
+								pstrdup("toast chunk data is null"));
 		return;
 	}
 	if (!VARATT_IS_EXTENDED(chunk))
@@ -867,37 +1204,37 @@ check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx)
 		/* should never happen */
 		uint32		header = ((varattrib_4b *) chunk)->va_4byte.va_header;
 
-		report_corruption(ctx,
-						  psprintf("corrupt extended toast chunk has invalid varlena header: %0x (sequence number %d)",
-								   header, curchunk));
+		report_toast_corruption(ctx, tctx,
+								psprintf("corrupt extended toast chunk has invalid varlena header: %0x (sequence number %d)",
+										 header, curchunk));
 		return;
 	}
 
 	/*
 	 * Some checks on the data we've found
 	 */
-	if (curchunk != ctx->chunkno)
+	if (curchunk != tctx->chunkno)
 	{
-		report_corruption(ctx,
-						  psprintf("toast chunk sequence number %u does not match the expected sequence number %u",
-								   curchunk, ctx->chunkno));
+		report_toast_corruption(ctx, tctx,
+								psprintf("toast chunk sequence number %u does not match the expected sequence number %u",
+										 curchunk, tctx->chunkno));
 		return;
 	}
-	if (curchunk > ctx->endchunk)
+	if (curchunk > tctx->endchunk)
 	{
-		report_corruption(ctx,
-						  psprintf("toast chunk sequence number %u exceeds the end chunk sequence number %u",
-								   curchunk, ctx->endchunk));
+		report_toast_corruption(ctx, tctx,
+								psprintf("toast chunk sequence number %u exceeds the end chunk sequence number %u",
+										 curchunk, tctx->endchunk));
 		return;
 	}
 
-	expected_size = curchunk < ctx->totalchunks - 1 ? TOAST_MAX_CHUNK_SIZE
-		: ctx->attrsize - ((ctx->totalchunks - 1) * TOAST_MAX_CHUNK_SIZE);
+	expected_size = curchunk < tctx->totalchunks - 1 ? TOAST_MAX_CHUNK_SIZE
+		: tctx->attrsize - ((tctx->totalchunks - 1) * TOAST_MAX_CHUNK_SIZE);
 	if (chunksize != expected_size)
 	{
-		report_corruption(ctx,
-						  psprintf("toast chunk size %u differs from the expected size %u",
-								   chunksize, expected_size));
+		report_toast_corruption(ctx, tctx,
+								psprintf("toast chunk size %u differs from the expected size %u",
+										 chunksize, expected_size));
 		return;
 	}
 }
@@ -907,17 +1244,17 @@ check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx)
  * found in ctx->tupstore.
  *
  * This function follows the logic performed by heap_deform_tuple(), and in the
- * case of a toasted value, optionally continues along the logic of
- * detoast_external_attr(), checking for any conditions that would result in
- * either of those functions Asserting or crashing the backend.  The checks
- * performed by Asserts present in those two functions are also performed here.
- * In cases where those two functions are a bit cavalier in their assumptions
- * about data being correct, we perform additional checks not present in either
- * of those two functions.  Where some condition is checked in both of those
- * functions, we perform it here twice, as we parallel the logical flow of
- * those two functions.  The presence of duplicate checks seems a reasonable
- * price to pay for keeping this code tightly coupled with the code it
- * protects.
+ * case of a toasted value, optionally stores the toast pointer so later it can
+ * be checked following the logic of detoast_external_attr(), checking for any
+ * conditions that would result in either of those functions Asserting or
+ * crashing the backend.  The checks performed by Asserts present in those two
+ * functions are also performed here and in check_toasted_attributes.  In cases
+ * where those two functions are a bit cavalier in their assumptions about data
+ * being correct, we perform additional checks not present in either of those
+ * two functions.  Where some condition is checked in both of those functions,
+ * we perform it here twice, as we parallel the logical flow of those two
+ * functions.  The presence of duplicate checks seems a reasonable price to pay
+ * for keeping this code tightly coupled with the code it protects.
  *
  * Returns true if the tuple attribute is sane enough for processing to
  * continue on to the next attribute, false otherwise.
@@ -925,12 +1262,6 @@ check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx)
 static bool
 check_tuple_attribute(HeapCheckContext *ctx)
 {
-	struct varatt_external toast_pointer;
-	ScanKeyData toastkey;
-	SysScanDesc toastscan;
-	SnapshotData SnapshotToast;
-	HeapTuple	toasttup;
-	bool		found_toasttup;
 	Datum		attdatum;
 	struct varlena *attr;
 	char	   *tp;				/* pointer to the tuple data */
@@ -944,12 +1275,12 @@ check_tuple_attribute(HeapCheckContext *ctx)
 
 	if (ctx->tuphdr->t_hoff + ctx->offset > ctx->lp_len)
 	{
-		report_corruption(ctx,
-						  psprintf("attribute %u with length %u starts at offset %u beyond total tuple length %u",
-								   ctx->attnum,
-								   thisatt->attlen,
-								   ctx->tuphdr->t_hoff + ctx->offset,
-								   ctx->lp_len));
+		report_main_corruption(ctx,
+							   psprintf("attribute %u with length %u starts at offset %u beyond total tuple length %u",
+										ctx->attnum,
+										thisatt->attlen,
+										ctx->tuphdr->t_hoff + ctx->offset,
+										ctx->lp_len));
 		return false;
 	}
 
@@ -965,12 +1296,12 @@ check_tuple_attribute(HeapCheckContext *ctx)
 											tp + ctx->offset);
 		if (ctx->tuphdr->t_hoff + ctx->offset > ctx->lp_len)
 		{
-			report_corruption(ctx,
-							  psprintf("attribute %u with length %u ends at offset %u beyond total tuple length %u",
-									   ctx->attnum,
-									   thisatt->attlen,
-									   ctx->tuphdr->t_hoff + ctx->offset,
-									   ctx->lp_len));
+			report_main_corruption(ctx,
+								   psprintf("attribute %u with length %u ends at offset %u beyond total tuple length %u",
+											ctx->attnum,
+											thisatt->attlen,
+											ctx->tuphdr->t_hoff + ctx->offset,
+											ctx->lp_len));
 			return false;
 		}
 		return true;
@@ -998,10 +1329,10 @@ check_tuple_attribute(HeapCheckContext *ctx)
 
 		if (va_tag != VARTAG_ONDISK)
 		{
-			report_corruption(ctx,
-							  psprintf("toasted attribute %u has unexpected TOAST tag %u",
-									   ctx->attnum,
-									   va_tag));
+			report_main_corruption(ctx,
+								   psprintf("toasted attribute %u has unexpected TOAST tag %u",
+											ctx->attnum,
+											va_tag));
 			/* We can't know where the next attribute begins */
 			return false;
 		}
@@ -1013,12 +1344,12 @@ check_tuple_attribute(HeapCheckContext *ctx)
 
 	if (ctx->tuphdr->t_hoff + ctx->offset > ctx->lp_len)
 	{
-		report_corruption(ctx,
-						  psprintf("attribute %u with length %u ends at offset %u beyond total tuple length %u",
-								   ctx->attnum,
-								   thisatt->attlen,
-								   ctx->tuphdr->t_hoff + ctx->offset,
-								   ctx->lp_len));
+		report_main_corruption(ctx,
+							   psprintf("attribute %u with length %u ends at offset %u beyond total tuple length %u",
+										ctx->attnum,
+										thisatt->attlen,
+										ctx->tuphdr->t_hoff + ctx->offset,
+										ctx->lp_len));
 
 		return false;
 	}
@@ -1045,18 +1376,18 @@ check_tuple_attribute(HeapCheckContext *ctx)
 	/* The tuple header better claim to contain toasted values */
 	if (!(infomask & HEAP_HASEXTERNAL))
 	{
-		report_corruption(ctx,
-						  psprintf("attribute %u is external but tuple header flag HEAP_HASEXTERNAL not set",
-								   ctx->attnum));
+		report_main_corruption(ctx,
+							   psprintf("attribute %u is external but tuple header flag HEAP_HASEXTERNAL not set",
+										ctx->attnum));
 		return true;
 	}
 
 	/* The relation better have a toast table */
 	if (!ctx->rel->rd_rel->reltoastrelid)
 	{
-		report_corruption(ctx,
-						  psprintf("attribute %u is external but relation has no toast relation",
-								   ctx->attnum));
+		report_main_corruption(ctx,
+							   psprintf("attribute %u is external but relation has no toast relation",
+										ctx->attnum));
 		return true;
 	}
 
@@ -1065,189 +1396,115 @@ check_tuple_attribute(HeapCheckContext *ctx)
 		return true;
 
 	/*
-	 * Must copy attr into toast_pointer for alignment considerations
+	 * If this tuple is at risk of being vacuumed away, we cannot check the
+	 * toast.  Otherwise, we push a copy of the toast tuple so we can check it
+	 * after releasing the main table buffer lock.
 	 */
-	VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
-
-	ctx->attrsize = VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer);
-	ctx->endchunk = (ctx->attrsize - 1) / TOAST_MAX_CHUNK_SIZE;
-	ctx->totalchunks = ctx->endchunk + 1;
+	if (!ctx->tuple_is_volatile)
+	{
+		ToastCheckContext *tctx;
 
-	/*
-	 * Setup a scan key to find chunks in toast table with matching va_valueid
-	 */
-	ScanKeyInit(&toastkey,
-				(AttrNumber) 1,
-				BTEqualStrategyNumber, F_OIDEQ,
-				ObjectIdGetDatum(toast_pointer.va_valueid));
+		tctx = (ToastCheckContext *) palloc0fast(sizeof(ToastCheckContext));
 
-	/*
-	 * Check if any chunks for this toasted object exist in the toast table,
-	 * accessible via the index.
-	 */
-	init_toast_snapshot(&SnapshotToast);
-	toastscan = systable_beginscan_ordered(ctx->toast_rel,
-										   ctx->valid_toast_index,
-										   &SnapshotToast, 1,
-										   &toastkey);
-	ctx->chunkno = 0;
-	found_toasttup = false;
-	while ((toasttup =
-			systable_getnext_ordered(toastscan,
-									 ForwardScanDirection)) != NULL)
-	{
-		found_toasttup = true;
-		check_toast_tuple(toasttup, ctx);
-		ctx->chunkno++;
+		VARATT_EXTERNAL_GET_POINTER(tctx->toast_pointer, attr);
+		tctx->blkno = ctx->blkno;
+		tctx->offnum = ctx->offnum;
+		tctx->attnum = ctx->attnum;
+		ctx->toasted_attributes = lappend(ctx->toasted_attributes, tctx);
 	}
-	if (!found_toasttup)
-		report_corruption(ctx,
-						  psprintf("toasted value for attribute %u missing from toast table",
-								   ctx->attnum));
-	else if (ctx->chunkno != (ctx->endchunk + 1))
-		report_corruption(ctx,
-						  psprintf("final toast chunk number %u differs from expected value %u",
-								   ctx->chunkno, (ctx->endchunk + 1)));
-	systable_endscan_ordered(toastscan);
 
 	return true;
 }
 
 /*
- * Check the current tuple as tracked in ctx, recording any corruption found in
- * ctx->tupstore.
+ * For each attribute collected in ctx->toasted_attributes, look up the value
+ * in the toast table and perform checks on it.  This function should only be
+ * called on toast pointers which cannot be vacuumed away during our
+ * processing.
  */
 static void
-check_tuple(HeapCheckContext *ctx)
+check_toasted_attributes(HeapCheckContext *ctx)
 {
-	TransactionId xmin;
-	TransactionId xmax;
-	bool		fatal = false;
-	uint16		infomask = ctx->tuphdr->t_infomask;
+	ListCell   *cell;
 
-	/* If xmin is normal, it should be within valid range */
-	xmin = HeapTupleHeaderGetXmin(ctx->tuphdr);
-	switch (get_xid_status(xmin, ctx, NULL))
+	foreach(cell, ctx->toasted_attributes)
 	{
-		case XID_INVALID:
-		case XID_BOUNDS_OK:
-			break;
-		case XID_IN_FUTURE:
-			report_corruption(ctx,
-							  psprintf("xmin %u equals or exceeds next valid transaction ID %u:%u",
-									   xmin,
-									   EpochFromFullTransactionId(ctx->next_fxid),
-									   XidFromFullTransactionId(ctx->next_fxid)));
-			fatal = true;
-			break;
-		case XID_PRECEDES_CLUSTERMIN:
-			report_corruption(ctx,
-							  psprintf("xmin %u precedes oldest valid transaction ID %u:%u",
-									   xmin,
-									   EpochFromFullTransactionId(ctx->oldest_fxid),
-									   XidFromFullTransactionId(ctx->oldest_fxid)));
-			fatal = true;
-			break;
-		case XID_PRECEDES_RELMIN:
-			report_corruption(ctx,
-							  psprintf("xmin %u precedes relation freeze threshold %u:%u",
-									   xmin,
-									   EpochFromFullTransactionId(ctx->relfrozenfxid),
-									   XidFromFullTransactionId(ctx->relfrozenfxid)));
-			fatal = true;
-			break;
-	}
+		ToastCheckContext *tctx;
+		SnapshotData SnapshotToast;
+		ScanKeyData toastkey;
+		SysScanDesc toastscan;
+		bool		found_toasttup;
+		HeapTuple	toasttup;
+
+		tctx = lfirst(cell);
+		tctx->attrsize = VARATT_EXTERNAL_GET_EXTSIZE(tctx->toast_pointer);
+		tctx->endchunk = (tctx->attrsize - 1) / TOAST_MAX_CHUNK_SIZE;
+		tctx->totalchunks = tctx->endchunk + 1;
 
-	xmax = HeapTupleHeaderGetRawXmax(ctx->tuphdr);
+		/*
+		 * Setup a scan key to find chunks in toast table with matching
+		 * va_valueid
+		 */
+		ScanKeyInit(&toastkey,
+					(AttrNumber) 1,
+					BTEqualStrategyNumber, F_OIDEQ,
+					ObjectIdGetDatum(tctx->toast_pointer.va_valueid));
 
-	if (infomask & HEAP_XMAX_IS_MULTI)
-	{
-		/* xmax is a multixact, so it should be within valid MXID range */
-		switch (check_mxid_valid_in_rel(xmax, ctx))
-		{
-			case XID_INVALID:
-				report_corruption(ctx,
-								  pstrdup("multitransaction ID is invalid"));
-				fatal = true;
-				break;
-			case XID_PRECEDES_RELMIN:
-				report_corruption(ctx,
-								  psprintf("multitransaction ID %u precedes relation minimum multitransaction ID threshold %u",
-										   xmax, ctx->relminmxid));
-				fatal = true;
-				break;
-			case XID_PRECEDES_CLUSTERMIN:
-				report_corruption(ctx,
-								  psprintf("multitransaction ID %u precedes oldest valid multitransaction ID threshold %u",
-										   xmax, ctx->oldest_mxact));
-				fatal = true;
-				break;
-			case XID_IN_FUTURE:
-				report_corruption(ctx,
-								  psprintf("multitransaction ID %u equals or exceeds next valid multitransaction ID %u",
-										   xmax,
-										   ctx->next_mxact));
-				fatal = true;
-				break;
-			case XID_BOUNDS_OK:
-				break;
-		}
-	}
-	else
-	{
 		/*
-		 * xmax is not a multixact and is normal, so it should be within the
-		 * valid XID range.
+		 * Check if any chunks for this toasted object exist in the toast
+		 * table, accessible via the index.
 		 */
-		switch (get_xid_status(xmax, ctx, NULL))
+		init_toast_snapshot(&SnapshotToast);
+		toastscan = systable_beginscan_ordered(ctx->toast_rel,
+											   ctx->valid_toast_index,
+											   &SnapshotToast, 1,
+											   &toastkey);
+		tctx->chunkno = 0;
+		found_toasttup = false;
+		while ((toasttup =
+				systable_getnext_ordered(toastscan,
+										 ForwardScanDirection)) != NULL)
 		{
-			case XID_INVALID:
-			case XID_BOUNDS_OK:
-				break;
-			case XID_IN_FUTURE:
-				report_corruption(ctx,
-								  psprintf("xmax %u equals or exceeds next valid transaction ID %u:%u",
-										   xmax,
-										   EpochFromFullTransactionId(ctx->next_fxid),
-										   XidFromFullTransactionId(ctx->next_fxid)));
-				fatal = true;
-				break;
-			case XID_PRECEDES_CLUSTERMIN:
-				report_corruption(ctx,
-								  psprintf("xmax %u precedes oldest valid transaction ID %u:%u",
-										   xmax,
-										   EpochFromFullTransactionId(ctx->oldest_fxid),
-										   XidFromFullTransactionId(ctx->oldest_fxid)));
-				fatal = true;
-				break;
-			case XID_PRECEDES_RELMIN:
-				report_corruption(ctx,
-								  psprintf("xmax %u precedes relation freeze threshold %u:%u",
-										   xmax,
-										   EpochFromFullTransactionId(ctx->relfrozenfxid),
-										   XidFromFullTransactionId(ctx->relfrozenfxid)));
-				fatal = true;
+			found_toasttup = true;
+			check_toast_tuple(toasttup, ctx, tctx);
+			tctx->chunkno++;
 		}
+		if (!found_toasttup)
+			report_toast_corruption(ctx, tctx,
+									psprintf("toasted value for attribute %u missing from toast table",
+											 tctx->attnum));
+		else if (tctx->chunkno != (tctx->endchunk + 1))
+			report_toast_corruption(ctx, tctx,
+									psprintf("final toast chunk number %u differs from expected value %u",
+											 tctx->chunkno, (tctx->endchunk + 1)));
+		systable_endscan_ordered(toastscan);
+
+		pfree(tctx);
 	}
+	list_free(ctx->toasted_attributes);
+	ctx->toasted_attributes = NIL;
+}
 
+/*
+ * Check the current tuple as tracked in ctx, recording any corruption found in
+ * ctx->tupstore.
+ */
+static void
+check_tuple(HeapCheckContext *ctx)
+{
 	/*
-	 * Cannot process tuple data if tuple header was corrupt, as the offsets
-	 * within the page cannot be trusted, leaving too much risk of reading
-	 * garbage if we continue.
-	 *
-	 * We also cannot process the tuple if the xmin or xmax were invalid
-	 * relative to relfrozenxid or relminmxid, as clog entries for the xids
-	 * may already be gone.
+	 * Check various forms of tuple header corruption.  If the header is too
+	 * corrupt to continue checking, we cannot continue with other checks.
 	 */
-	if (fatal)
+	if (!check_tuple_header(ctx))
 		return;
 
 	/*
-	 * Check various forms of tuple header corruption.  If the header is too
-	 * corrupt to continue checking, or if the tuple is not visible to anyone,
-	 * we cannot continue with other checks.
+	 * Check tuple visibility.  If the inserting transaction aborted, we
+	 * cannot assume our relation description matches the tuple structure, and
+	 * therefore cannot check it.
 	 */
-	if (!check_tuple_header_and_visibilty(ctx->tuphdr, ctx))
+	if (!check_tuple_visibility(ctx))
 		return;
 
 	/*
@@ -1257,10 +1514,10 @@ check_tuple(HeapCheckContext *ctx)
 	 */
 	if (RelationGetDescr(ctx->rel)->natts < ctx->natts)
 	{
-		report_corruption(ctx,
-						  psprintf("number of attributes %u exceeds maximum expected for table %u",
-								   ctx->natts,
-								   RelationGetDescr(ctx->rel)->natts));
+		report_main_corruption(ctx,
+							   psprintf("number of attributes %u exceeds maximum expected for table %u",
+										ctx->natts,
+										RelationGetDescr(ctx->rel)->natts));
 		return;
 	}
 
@@ -1269,6 +1526,10 @@ check_tuple(HeapCheckContext *ctx)
 	 * next, at which point we abort further attribute checks for this tuple.
 	 * Note that we don't abort for all types of corruption, only for those
 	 * types where we don't know how to continue.
+	 *
+	 * While checking the tuple attributes, we build a list of toast pointers
+	 * we encounter, to be checked later.  If further attribute checking is
+	 * aborted, we still have the pointers collected prior to aborting.
 	 */
 	ctx->offset = 0;
 	for (ctx->attnum = 0; ctx->attnum < ctx->natts; ctx->attnum++)
@@ -1448,7 +1709,7 @@ get_xid_status(TransactionId xid, HeapCheckContext *ctx,
 	if (FullTransactionIdPrecedesOrEquals(clog_horizon, fxid))
 	{
 		if (TransactionIdIsCurrentTransactionId(xid))
-			*status = XID_IN_PROGRESS;
+			*status = XID_IS_CURRENT_XID;
 		else if (TransactionIdDidCommit(xid))
 			*status = XID_COMMITTED;
 		else if (TransactionIdDidAbort(xid))
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 9e6777e9d0..0ce261e2a2 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2557,6 +2557,7 @@ TimestampTz
 TmFromChar
 TmToChar
 ToastAttrInfo
+ToastCheckContext
 ToastTupleContext
 TocEntry
 TokenAuxData
-- 
2.21.1 (Apple Git-122.3)

