From d412782c24fb42b8c3cd626823ff9e9ff1523092 Mon Sep 17 00:00:00 2001
From: Mark Dilger <mark.dilger@enterprisedb.com>
Date: Tue, 30 Mar 2021 21:00:34 -0700
Subject: [PATCH v14 3/3] Checking toast separately from the main table.

Rather than checking toasted attributes as we find them, creating a
list of them and checking all the toast in the list after releasing
the buffer lock for each main table page.
---
 contrib/amcheck/verify_heapam.c           | 598 +++++++++++++---------
 src/bin/pg_amcheck/t/004_verify_heapam.pl |  66 ++-
 src/tools/pgindent/typedefs.list          |   1 +
 3 files changed, 430 insertions(+), 235 deletions(-)

diff --git a/contrib/amcheck/verify_heapam.c b/contrib/amcheck/verify_heapam.c
index be22b491d6..254dc9f3a5 100644
--- a/contrib/amcheck/verify_heapam.c
+++ b/contrib/amcheck/verify_heapam.c
@@ -58,6 +58,26 @@ typedef enum SkipPages
 	SKIP_PAGES_NONE
 } SkipPages;
 
+/*
+ * Struct holding information necessary to check a toasted attribute, including
+ * the toast pointer, state about the current toast chunk being checked, and
+ * the location in the main table of the toasted attribute.  We have to track
+ * the tuple's location in the main table for reporting purposes because by the
+ * time the toast is checked our HeapCheckContext will no longer be pointing to
+ * the relevant tuple.
+ */
+typedef struct ToastCheckContext
+{
+	struct varatt_external toast_pointer;
+	BlockNumber blkno;			/* block in main table */
+	OffsetNumber offnum;		/* offset in main table */
+	AttrNumber	attnum;			/* attribute in main table */
+	int32		chunkno;		/* chunk number in toast table */
+	int32		attrsize;		/* size of toasted attribute */
+	int32		endchunk;		/* last chunk number in toast table */
+	int32		totalchunks;	/* total chunks in toast table */
+} ToastCheckContext;
+
 /*
  * Struct holding the running context information during
  * a lifetime of a verify_heapam execution.
@@ -119,11 +139,11 @@ typedef struct HeapCheckContext
 	/* True if toast for this tuple could be vacuumed away */
 	bool		tuple_can_be_pruned;
 
-	/* Values for iterating over toast for the attribute */
-	int32		chunkno;
-	int32		attrsize;
-	int32		endchunk;
-	int32		totalchunks;
+	/*
+	 * List of ToastCheckContext structs for toasted attributes which are not
+	 * in danger of being vacuumed way and should be checked
+	 */
+	List	   *toasted_attributes;
 
 	/* Whether verify_heapam has yet encountered any corrupt tuples */
 	bool		is_corrupt;
@@ -136,13 +156,18 @@ typedef struct HeapCheckContext
 /* Internal implementation */
 static void sanity_check_relation(Relation rel);
 static void check_tuple(HeapCheckContext *ctx);
-static void check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx);
+static int32 check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx,
+							   ToastCheckContext *tctx, bool *error);
 
-static bool check_tuple_attribute(HeapCheckContext *ctx);
 static bool check_tuple_header(HeapCheckContext *ctx);
 static bool check_tuple_visibility(HeapCheckContext *ctx);
 
+static bool check_tuple_attribute(HeapCheckContext *ctx);
+static void check_toasted_attributes(HeapCheckContext *ctx);
+
 static void report_corruption(HeapCheckContext *ctx, char *msg);
+static void report_toast_corruption(HeapCheckContext *ctx,
+									ToastCheckContext *tctx, char *msg);
 static TupleDesc verify_heapam_tupdesc(void);
 static FullTransactionId FullTransactionIdFromXidAndCtx(TransactionId xid,
 														const HeapCheckContext *ctx);
@@ -253,6 +278,7 @@ verify_heapam(PG_FUNCTION_ARGS)
 
 	memset(&ctx, 0, sizeof(HeapCheckContext));
 	ctx.cached_xid = InvalidTransactionId;
+	ctx.toasted_attributes = NIL;
 
 	/*
 	 * Any xmin newer than the xmin of our snapshot can't become all-visible
@@ -469,6 +495,14 @@ verify_heapam(PG_FUNCTION_ARGS)
 		/* clean up */
 		UnlockReleaseBuffer(ctx.buffer);
 
+		/*
+		 * Check any toast pointers from the page whose lock we just released
+		 * and reset the list to NIL.
+		 */
+		if (ctx.toasted_attributes != NIL)
+			check_toasted_attributes(&ctx);
+		Assert(ctx.toasted_attributes == NIL);
+
 		if (on_error_stop && ctx.is_corrupt)
 			break;
 	}
@@ -510,14 +544,13 @@ sanity_check_relation(Relation rel)
 }
 
 /*
- * Record a single corruption found in the table.  The values in ctx should
- * reflect the location of the corruption, and the msg argument should contain
- * a human-readable description of the corruption.
- *
- * The msg argument is pfree'd by this function.
+ * Shared internal implementation for report_corruption and
+ * report_toast_corruption.
  */
 static void
-report_corruption(HeapCheckContext *ctx, char *msg)
+report_corruption_internal(Tuplestorestate *tupstore, TupleDesc tupdesc,
+						   BlockNumber blkno, OffsetNumber offnum,
+						   AttrNumber attnum, char *msg)
 {
 	Datum		values[HEAPCHECK_RELATION_COLS];
 	bool		nulls[HEAPCHECK_RELATION_COLS];
@@ -525,10 +558,10 @@ report_corruption(HeapCheckContext *ctx, char *msg)
 
 	MemSet(values, 0, sizeof(values));
 	MemSet(nulls, 0, sizeof(nulls));
-	values[0] = Int64GetDatum(ctx->blkno);
-	values[1] = Int32GetDatum(ctx->offnum);
-	values[2] = Int32GetDatum(ctx->attnum);
-	nulls[2] = (ctx->attnum < 0);
+	values[0] = Int64GetDatum(blkno);
+	values[1] = Int32GetDatum(offnum);
+	values[2] = Int32GetDatum(attnum);
+	nulls[2] = (attnum < 0);
 	values[3] = CStringGetTextDatum(msg);
 
 	/*
@@ -541,8 +574,39 @@ report_corruption(HeapCheckContext *ctx, char *msg)
 	 */
 	pfree(msg);
 
-	tuple = heap_form_tuple(ctx->tupdesc, values, nulls);
-	tuplestore_puttuple(ctx->tupstore, tuple);
+	tuple = heap_form_tuple(tupdesc, values, nulls);
+	tuplestore_puttuple(tupstore, tuple);
+}
+
+/*
+ * Record a single corruption found in the main table.  The values in ctx should
+ * indicate the location of the corruption, and the msg argument should contain
+ * a human-readable description of the corruption.
+ *
+ * The msg argument is pfree'd by this function.
+ */
+static void
+report_corruption(HeapCheckContext *ctx, char *msg)
+{
+	report_corruption_internal(ctx->tupstore, ctx->tupdesc, ctx->blkno,
+							   ctx->offnum, ctx->attnum, msg);
+	ctx->is_corrupt = true;
+}
+
+/*
+ * Record corruption found in the toast table.  The values in tctx should
+ * indicate the location in the main table where the toast pointer was
+ * encountered, and the msg argument should contain a human-readable
+ * description of the toast table corruption.
+ *
+ * As above, the msg argument is pfree'd by this function.
+ */
+static void
+report_toast_corruption(HeapCheckContext *ctx, ToastCheckContext *tctx,
+						char *msg)
+{
+	report_corruption_internal(ctx->tupstore, ctx->tupdesc, tctx->blkno,
+							   tctx->offnum, tctx->attnum, msg);
 	ctx->is_corrupt = true;
 }
 
@@ -1085,7 +1149,6 @@ check_tuple_visibility(HeapCheckContext *ctx)
 	return false;				/* not reached */
 }
 
-
 /*
  * Check the current toast tuple against the state tracked in ctx, recording
  * any corruption found in ctx->tupstore.
@@ -1097,8 +1160,9 @@ check_tuple_visibility(HeapCheckContext *ctx)
  * each toast tuple being checked against where we are in the sequence, as well
  * as each toast tuple having its varlena structure sanity checked.
  */
-static void
-check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx)
+static int32
+check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx,
+				  ToastCheckContext *tctx, bool *error)
 {
 	int32		curchunk;
 	Pointer		chunk;
@@ -1113,17 +1177,21 @@ check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx)
 										 ctx->toast_rel->rd_att, &isnull));
 	if (isnull)
 	{
-		report_corruption(ctx,
-						  pstrdup("toast chunk sequence number is null"));
-		return;
+		report_toast_corruption(ctx, tctx,
+								psprintf("toast value %u has toast chunk with null sequence number",
+										 tctx->toast_pointer.va_valueid));
+		*error = true;
+		return 0;
 	}
 	chunk = DatumGetPointer(fastgetattr(toasttup, 3,
 										ctx->toast_rel->rd_att, &isnull));
 	if (isnull)
 	{
-		report_corruption(ctx,
-						  pstrdup("toast chunk data is null"));
-		return;
+		report_toast_corruption(ctx, tctx,
+								psprintf("toast value %u chunk data is null",
+										 tctx->toast_pointer.va_valueid));
+		*error = true;
+		return 0;
 	}
 	if (!VARATT_IS_EXTENDED(chunk))
 		chunksize = VARSIZE(chunk) - VARHDRSZ;
@@ -1139,39 +1207,49 @@ check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx)
 		/* should never happen */
 		uint32		header = ((varattrib_4b *) chunk)->va_4byte.va_header;
 
-		report_corruption(ctx,
-						  psprintf("corrupt extended toast chunk has invalid varlena header: %0x (sequence number %d)",
-								   header, curchunk));
-		return;
+		report_toast_corruption(ctx, tctx,
+								psprintf("toast value %u corrupt extended chunk has invalid varlena header: %0x (sequence number %d)",
+										 tctx->toast_pointer.va_valueid,
+										 header, curchunk));
+		*error = true;
+		return 0;
 	}
 
 	/*
 	 * Some checks on the data we've found
 	 */
-	if (curchunk != ctx->chunkno)
+	if (curchunk != tctx->chunkno)
 	{
-		report_corruption(ctx,
-						  psprintf("toast chunk sequence number %u does not match the expected sequence number %u",
-								   curchunk, ctx->chunkno));
-		return;
+		report_toast_corruption(ctx, tctx,
+								psprintf("toast value %u chunk sequence number %u does not match the expected sequence number %u",
+										 tctx->toast_pointer.va_valueid,
+										 curchunk, tctx->chunkno));
+		*error = true;
+		return chunksize;
 	}
-	if (curchunk > ctx->endchunk)
+	if (curchunk > tctx->endchunk)
 	{
-		report_corruption(ctx,
-						  psprintf("toast chunk sequence number %u exceeds the end chunk sequence number %u",
-								   curchunk, ctx->endchunk));
-		return;
+		report_toast_corruption(ctx, tctx,
+								psprintf("toast value %u chunk sequence number %u exceeds the end chunk sequence number %u",
+										 tctx->toast_pointer.va_valueid,
+										 curchunk, tctx->endchunk));
+		*error = true;
+		return chunksize;
 	}
 
-	expected_size = curchunk < ctx->totalchunks - 1 ? TOAST_MAX_CHUNK_SIZE
-		: ctx->attrsize - ((ctx->totalchunks - 1) * TOAST_MAX_CHUNK_SIZE);
+	expected_size = curchunk < tctx->totalchunks - 1 ? TOAST_MAX_CHUNK_SIZE
+		: tctx->attrsize - ((tctx->totalchunks - 1) * TOAST_MAX_CHUNK_SIZE);
 	if (chunksize != expected_size)
 	{
-		report_corruption(ctx,
-						  psprintf("toast chunk size %u differs from the expected size %u",
-								   chunksize, expected_size));
-		return;
+		report_toast_corruption(ctx, tctx,
+								psprintf("toast value %u chunk size %u differs from the expected size %u",
+										 tctx->toast_pointer.va_valueid,
+										 chunksize, expected_size));
+		*error = true;
+		return chunksize;
 	}
+
+	return chunksize;
 }
 
 /*
@@ -1179,17 +1257,17 @@ check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx)
  * found in ctx->tupstore.
  *
  * This function follows the logic performed by heap_deform_tuple(), and in the
- * case of a toasted value, optionally continues along the logic of
- * detoast_external_attr(), checking for any conditions that would result in
- * either of those functions Asserting or crashing the backend.  The checks
- * performed by Asserts present in those two functions are also performed here.
- * In cases where those two functions are a bit cavalier in their assumptions
- * about data being correct, we perform additional checks not present in either
- * of those two functions.  Where some condition is checked in both of those
- * functions, we perform it here twice, as we parallel the logical flow of
- * those two functions.  The presence of duplicate checks seems a reasonable
- * price to pay for keeping this code tightly coupled with the code it
- * protects.
+ * case of a toasted value, optionally stores the toast pointer so later it can
+ * be checked following the logic of detoast_external_attr(), checking for any
+ * conditions that would result in either of those functions Asserting or
+ * crashing the backend.  The checks performed by Asserts present in those two
+ * functions are also performed here and in check_toasted_attributes.  In cases
+ * where those two functions are a bit cavalier in their assumptions about data
+ * being correct, we perform additional checks not present in either of those
+ * two functions.  Where some condition is checked in both of those functions,
+ * we perform it here twice, as we parallel the logical flow of those two
+ * functions.  The presence of duplicate checks seems a reasonable price to pay
+ * for keeping this code tightly coupled with the code it protects.
  *
  * Returns true if the tuple attribute is sane enough for processing to
  * continue on to the next attribute, false otherwise.
@@ -1197,17 +1275,12 @@ check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx)
 static bool
 check_tuple_attribute(HeapCheckContext *ctx)
 {
-	struct varatt_external toast_pointer;
-	ScanKeyData toastkey;
-	SysScanDesc toastscan;
-	SnapshotData SnapshotToast;
-	HeapTuple	toasttup;
-	bool		found_toasttup;
 	Datum		attdatum;
 	struct varlena *attr;
 	char	   *tp;				/* pointer to the tuple data */
 	uint16		infomask;
 	Form_pg_attribute thisatt;
+	struct varatt_external toast_pointer;
 
 	infomask = ctx->tuphdr->t_infomask;
 	thisatt = TupleDescAttr(RelationGetDescr(ctx->rel), ctx->attnum);
@@ -1271,8 +1344,7 @@ check_tuple_attribute(HeapCheckContext *ctx)
 		if (va_tag != VARTAG_ONDISK)
 		{
 			report_corruption(ctx,
-							  psprintf("toasted attribute %u has unexpected TOAST tag %u",
-									   ctx->attnum,
+							  psprintf("toasted attribute has unexpected TOAST tag %u",
 									   va_tag));
 			/* We can't know where the next attribute begins */
 			return false;
@@ -1286,8 +1358,7 @@ check_tuple_attribute(HeapCheckContext *ctx)
 	if (ctx->tuphdr->t_hoff + ctx->offset > ctx->lp_len)
 	{
 		report_corruption(ctx,
-						  psprintf("attribute %u with length %u ends at offset %u beyond total tuple length %u",
-								   ctx->attnum,
+						  psprintf("attribute with length %u ends at offset %u beyond total tuple length %u",
 								   thisatt->attlen,
 								   ctx->tuphdr->t_hoff + ctx->offset,
 								   ctx->lp_len));
@@ -1314,12 +1385,17 @@ check_tuple_attribute(HeapCheckContext *ctx)
 
 	/* It is external, and we're looking at a page on disk */
 
+	/*
+	 * Must copy attr into toast_pointer for alignment considerations
+	 */
+	VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
+
 	/* The tuple header better claim to contain toasted values */
 	if (!(infomask & HEAP_HASEXTERNAL))
 	{
 		report_corruption(ctx,
-						  psprintf("attribute %u is external but tuple header flag HEAP_HASEXTERNAL not set",
-								   ctx->attnum));
+						  psprintf("toast value %u is external but tuple header flag HEAP_HASEXTERNAL not set",
+								   toast_pointer.va_valueid));
 		return true;
 	}
 
@@ -1327,8 +1403,28 @@ check_tuple_attribute(HeapCheckContext *ctx)
 	if (!ctx->rel->rd_rel->reltoastrelid)
 	{
 		report_corruption(ctx,
-						  psprintf("attribute %u is external but relation has no toast relation",
-								   ctx->attnum));
+						  psprintf("toast value %u is external but relation has no toast relation",
+								   toast_pointer.va_valueid));
+		return true;
+	}
+
+	if (VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer) > toast_pointer.va_rawsize - VARHDRSZ)
+	{
+		report_corruption(ctx,
+						  psprintf("toast value %u external size %u exceeds maximum expected for rawsize %u",
+								   toast_pointer.va_valueid,
+								   VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer),
+								   toast_pointer.va_rawsize));
+		return true;
+	}
+
+	if (toast_pointer.va_toastrelid != ctx->rel->rd_rel->reltoastrelid)
+	{
+		report_corruption(ctx,
+						  psprintf("toast value %u toast relation oid %u differs from expected oid %u",
+								   toast_pointer.va_valueid,
+								   toast_pointer.va_toastrelid,
+								   ctx->rel->rd_rel->reltoastrelid));
 		return true;
 	}
 
@@ -1337,191 +1433,231 @@ check_tuple_attribute(HeapCheckContext *ctx)
 		return true;
 
 	/*
-	 * Must copy attr into toast_pointer for alignment considerations
+	 * If this tuple is at risk of being vacuumed away, we cannot check the
+	 * toast.  Otherwise, we push a copy of the toast tuple so we can check it
+	 * after releasing the main table buffer lock.
 	 */
-	VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
-
-	ctx->attrsize = VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer);
-	ctx->endchunk = (ctx->attrsize - 1) / TOAST_MAX_CHUNK_SIZE;
-	ctx->totalchunks = ctx->endchunk + 1;
+	if (!ctx->tuple_can_be_pruned)
+	{
+		ToastCheckContext *tctx;
 
-	/*
-	 * Setup a scan key to find chunks in toast table with matching va_valueid
-	 */
-	ScanKeyInit(&toastkey,
-				(AttrNumber) 1,
-				BTEqualStrategyNumber, F_OIDEQ,
-				ObjectIdGetDatum(toast_pointer.va_valueid));
+		tctx = (ToastCheckContext *) palloc0fast(sizeof(ToastCheckContext));
 
-	/*
-	 * Check if any chunks for this toasted object exist in the toast table,
-	 * accessible via the index.
-	 */
-	init_toast_snapshot(&SnapshotToast);
-	toastscan = systable_beginscan_ordered(ctx->toast_rel,
-										   ctx->valid_toast_index,
-										   &SnapshotToast, 1,
-										   &toastkey);
-	ctx->chunkno = 0;
-	found_toasttup = false;
-	while ((toasttup =
-			systable_getnext_ordered(toastscan,
-									 ForwardScanDirection)) != NULL)
-	{
-		found_toasttup = true;
-		check_toast_tuple(toasttup, ctx);
-		ctx->chunkno++;
+		VARATT_EXTERNAL_GET_POINTER(tctx->toast_pointer, attr);
+		tctx->blkno = ctx->blkno;
+		tctx->offnum = ctx->offnum;
+		tctx->attnum = ctx->attnum;
+		ctx->toasted_attributes = lappend(ctx->toasted_attributes, tctx);
 	}
-	if (!found_toasttup)
-		report_corruption(ctx,
-						  psprintf("toasted value for attribute %u missing from toast table",
-								   ctx->attnum));
-	else if (ctx->chunkno != (ctx->endchunk + 1))
-		report_corruption(ctx,
-						  psprintf("final toast chunk number %u differs from expected value %u",
-								   ctx->chunkno, (ctx->endchunk + 1)));
-	systable_endscan_ordered(toastscan);
 
 	return true;
 }
 
 /*
- * Check the current tuple as tracked in ctx, recording any corruption found in
- * ctx->tupstore.
+ * For each attribute collected in ctx->toasted_attributes, look up the value
+ * in the toast table and perform checks on it.  This function should only be
+ * called on toast pointers which cannot be vacuumed away during our
+ * processing.
  */
 static void
-check_tuple(HeapCheckContext *ctx)
+check_toasted_attributes(HeapCheckContext *ctx)
 {
-	TransactionId xmin;
-	TransactionId xmax;
-	bool		fatal = false;
-	uint16		infomask = ctx->tuphdr->t_infomask;
+	ListCell   *cell;
 
-	/* If xmin is normal, it should be within valid range */
-	xmin = HeapTupleHeaderGetXmin(ctx->tuphdr);
-	switch (get_xid_status(xmin, ctx, NULL))
+	foreach(cell, ctx->toasted_attributes)
 	{
-		case XID_INVALID:
-		case XID_BOUNDS_OK:
-			break;
-		case XID_IN_FUTURE:
-			report_corruption(ctx,
-							  psprintf("xmin %u equals or exceeds next valid transaction ID %u:%u",
-									   xmin,
-									   EpochFromFullTransactionId(ctx->next_fxid),
-									   XidFromFullTransactionId(ctx->next_fxid)));
-			fatal = true;
-			break;
-		case XID_PRECEDES_CLUSTERMIN:
-			report_corruption(ctx,
-							  psprintf("xmin %u precedes oldest valid transaction ID %u:%u",
-									   xmin,
-									   EpochFromFullTransactionId(ctx->oldest_fxid),
-									   XidFromFullTransactionId(ctx->oldest_fxid)));
-			fatal = true;
-			break;
-		case XID_PRECEDES_RELMIN:
-			report_corruption(ctx,
-							  psprintf("xmin %u precedes relation freeze threshold %u:%u",
-									   xmin,
-									   EpochFromFullTransactionId(ctx->relfrozenfxid),
-									   XidFromFullTransactionId(ctx->relfrozenfxid)));
-			fatal = true;
-			break;
-	}
+		ToastCheckContext *tctx;
+		SnapshotData SnapshotToast;
+		ScanKeyData toastkey;
+		SysScanDesc toastscan;
+		int64		toastsize;	/* corrupt toast could overflow 32 bits */
+		bool		found_toasttup;
+		bool		toast_error;
+		HeapTuple	toasttup;
+
+		tctx = lfirst(cell);
+		tctx->attrsize = VARATT_EXTERNAL_GET_EXTSIZE(tctx->toast_pointer);
+		tctx->endchunk = (tctx->attrsize - 1) / TOAST_MAX_CHUNK_SIZE;
+		tctx->totalchunks = tctx->endchunk + 1;
 
-	xmax = HeapTupleHeaderGetRawXmax(ctx->tuphdr);
+		/*
+		 * Setup a scan key to find chunks in toast table with matching
+		 * va_valueid
+		 */
+		ScanKeyInit(&toastkey,
+					(AttrNumber) 1,
+					BTEqualStrategyNumber, F_OIDEQ,
+					ObjectIdGetDatum(tctx->toast_pointer.va_valueid));
 
-	if (infomask & HEAP_XMAX_IS_MULTI)
-	{
-		/* xmax is a multixact, so it should be within valid MXID range */
-		switch (check_mxid_valid_in_rel(xmax, ctx))
-		{
-			case XID_INVALID:
-				report_corruption(ctx,
-								  pstrdup("multitransaction ID is invalid"));
-				fatal = true;
-				break;
-			case XID_PRECEDES_RELMIN:
-				report_corruption(ctx,
-								  psprintf("multitransaction ID %u precedes relation minimum multitransaction ID threshold %u",
-										   xmax, ctx->relminmxid));
-				fatal = true;
-				break;
-			case XID_PRECEDES_CLUSTERMIN:
-				report_corruption(ctx,
-								  psprintf("multitransaction ID %u precedes oldest valid multitransaction ID threshold %u",
-										   xmax, ctx->oldest_mxact));
-				fatal = true;
-				break;
-			case XID_IN_FUTURE:
-				report_corruption(ctx,
-								  psprintf("multitransaction ID %u equals or exceeds next valid multitransaction ID %u",
-										   xmax,
-										   ctx->next_mxact));
-				fatal = true;
-				break;
-			case XID_BOUNDS_OK:
-				break;
-		}
-	}
-	else
-	{
 		/*
-		 * xmax is not a multixact and is normal, so it should be within the
-		 * valid XID range.
+		 * Check if any chunks for this toasted object exist in the toast
+		 * table, accessible via the index.
 		 */
-		switch (get_xid_status(xmax, ctx, NULL))
+		init_toast_snapshot(&SnapshotToast);
+		toastscan = systable_beginscan_ordered(ctx->toast_rel,
+											   ctx->valid_toast_index,
+											   &SnapshotToast, 1,
+											   &toastkey);
+		tctx->chunkno = 0;
+		found_toasttup = false;
+		toastsize = 0;
+		while ((toasttup =
+				systable_getnext_ordered(toastscan,
+										 ForwardScanDirection)) != NULL)
 		{
-			case XID_INVALID:
-			case XID_BOUNDS_OK:
-				break;
-			case XID_IN_FUTURE:
-				report_corruption(ctx,
-								  psprintf("xmax %u equals or exceeds next valid transaction ID %u:%u",
-										   xmax,
-										   EpochFromFullTransactionId(ctx->next_fxid),
-										   XidFromFullTransactionId(ctx->next_fxid)));
-				fatal = true;
-				break;
-			case XID_PRECEDES_CLUSTERMIN:
-				report_corruption(ctx,
-								  psprintf("xmax %u precedes oldest valid transaction ID %u:%u",
-										   xmax,
-										   EpochFromFullTransactionId(ctx->oldest_fxid),
-										   XidFromFullTransactionId(ctx->oldest_fxid)));
-				fatal = true;
-				break;
-			case XID_PRECEDES_RELMIN:
-				report_corruption(ctx,
-								  psprintf("xmax %u precedes relation freeze threshold %u:%u",
-										   xmax,
-										   EpochFromFullTransactionId(ctx->relfrozenfxid),
-										   XidFromFullTransactionId(ctx->relfrozenfxid)));
-				fatal = true;
+			found_toasttup = true;
+			toastsize += check_toast_tuple(toasttup, ctx, tctx, &toast_error);
+			tctx->chunkno++;
+		}
+		systable_endscan_ordered(toastscan);
+
+		if (!found_toasttup)
+			report_toast_corruption(ctx, tctx,
+									psprintf("toast value %u not found in toast table",
+											 tctx->toast_pointer.va_valueid));
+		else if (tctx->chunkno != (tctx->endchunk + 1))
+			report_toast_corruption(ctx, tctx,
+									psprintf("toast value %u was expected to end at chunk %u, but ended at chunk %u",
+											 tctx->toast_pointer.va_valueid,
+											 (tctx->endchunk + 1), tctx->chunkno));
+		else if (toastsize != VARATT_EXTERNAL_GET_EXTSIZE(tctx->toast_pointer))
+			report_toast_corruption(ctx, tctx,
+									psprintf("toast value %u total size " INT64_FORMAT " differs from expected size %u",
+											 tctx->toast_pointer.va_valueid, toastsize,
+											 VARATT_EXTERNAL_GET_EXTSIZE(tctx->toast_pointer)));
+		else if (!toast_error)
+		{
+			if (!AllocSizeIsValid(tctx->toast_pointer.va_rawsize))
+			{
+				report_toast_corruption(ctx, tctx,
+										psprintf("toast value %u rawsize %u too large to be allocated",
+												 tctx->toast_pointer.va_valueid,
+												 tctx->toast_pointer.va_rawsize));
+				toast_error = true;
+			}
+
+			if (!AllocSizeIsValid(VARATT_EXTERNAL_GET_EXTSIZE(tctx->toast_pointer)))
+			{
+				report_toast_corruption(ctx, tctx,
+										psprintf("toast value %u extsize %u too large to be allocated",
+												 VARATT_EXTERNAL_GET_EXTSIZE(tctx->toast_pointer),
+												 tctx->toast_pointer.va_valueid));
+				toast_error = true;
+			}
+
+			if (!toast_error)
+			{
+				Size		allocsize;
+				struct varlena *attr;
+
+				/* Fetch all chunks */
+				allocsize = VARATT_EXTERNAL_GET_EXTSIZE(tctx->toast_pointer) + VARHDRSZ;
+				attr = (struct varlena *) palloc(allocsize);
+				if (VARATT_EXTERNAL_IS_COMPRESSED(tctx->toast_pointer))
+					SET_VARSIZE_COMPRESSED(attr, allocsize);
+				else
+					SET_VARSIZE(attr, allocsize);
+
+				table_relation_fetch_toast_slice(ctx->toast_rel, tctx->toast_pointer.va_valueid,
+												 toastsize, 0, toastsize, attr);
+
+				if (VARATT_IS_COMPRESSED(attr))
+				{
+#ifdef DECOMPRESSION_CORRUPTION_CHECKING
+					struct varlena *uncompressed;
+					int32		rawsize;
+#endif
+					Size		allocsize;
+					ToastCompressionId cmid;
+
+					/* allocate memory for the uncompressed data */
+					allocsize = VARDATA_COMPRESSED_GET_EXTSIZE(attr) + VARHDRSZ;
+					if (!AllocSizeIsValid(allocsize))
+						report_toast_corruption(ctx, tctx,
+												psprintf("toast value %u invalid uncompressed size %zu",
+														 tctx->toast_pointer.va_valueid,
+														 allocsize));
+					cmid = TOAST_COMPRESS_METHOD(attr);
+					switch (cmid)
+					{
+						case TOAST_PGLZ_COMPRESSION_ID:
+#ifdef DECOMPRESSION_CORRUPTION_CHECKING
+							/* decompress the data */
+							uncompressed = (struct varlena *) palloc(allocsize);
+							rawsize = pglz_decompress((char *) attr + VARHDRSZ_COMPRESSED,
+													  VARSIZE(attr) - VARHDRSZ_COMPRESSED,
+													  VARDATA(uncompressed),
+													  VARDATA_COMPRESSED_GET_EXTSIZE(attr), true);
+							if (rawsize < 0)
+								report_toast_corruption(ctx, tctx,
+														psprintf("toast value %u compressed pglz data is corrupt",
+																 tctx->toast_pointer.va_valueid));
+							pfree(uncompressed);
+#endif
+							break;
+						case TOAST_LZ4_COMPRESSION_ID:
+#ifndef USE_LZ4
+							report_toast_corruption(ctx, tctx,
+													psprintf("toast value %u unsupported LZ4 compression method",
+															 tctx->toast_pointer.va_valueid));
+#else
+#ifdef DECOMPRESSION_CORRUPTION_CHECKING
+							/* decompress the data */
+							uncompressed = (struct varlena *) palloc(allocsize);
+							rawsize = LZ4_decompress_safe((char *) attr + VARHDRSZ_COMPRESSED,
+														  VARDATA(uncompressed),
+														  VARSIZE(attr) - VARHDRSZ_COMPRESSED,
+														  VARDATA_COMPRESSED_GET_EXTSIZE(attr));
+							if (rawsize < 0)
+								report_toast_corruption(ctx, tctx,
+														psprintf("toast value %u compressed lz4 data is corrupt",
+																 tctx->toast_pointer.va_valueid));
+							pfree(uncompressed);
+#endif
+#endif
+							break;
+						default:
+							report_toast_corruption(ctx, tctx,
+													psprintf("toast value %u invalid compression method id %d",
+															 tctx->toast_pointer.va_valueid,
+															 cmid));
+					}
+				}
+				else if (VARSIZE(attr) != tctx->toast_pointer.va_rawsize)
+					report_toast_corruption(ctx, tctx,
+											psprintf("toast value %u detoasted attribute size %u differs from expected rawsize %u",
+													 tctx->toast_pointer.va_valueid,
+													 VARSIZE(attr),
+													 tctx->toast_pointer.va_rawsize));
+				pfree(attr);
+			}
 		}
+		pfree(tctx);
 	}
 
-	/*
-	 * Cannot process tuple data if tuple header was corrupt, as the offsets
-	 * within the page cannot be trusted, leaving too much risk of reading
-	 * garbage if we continue.
-	 *
-	 * We also cannot process the tuple if the xmin or xmax were invalid
-	 * relative to relfrozenxid or relminmxid, as clog entries for the xids
-	 * may already be gone.
-	 */
-	if (fatal)
-		return;
+	list_free(ctx->toasted_attributes);
+	ctx->toasted_attributes = NIL;
+}
 
+/*
+ * Check the current tuple as tracked in ctx, recording any corruption found in
+ * ctx->tupstore.
+ */
+static void
+check_tuple(HeapCheckContext *ctx)
+{
 	/*
 	 * Check various forms of tuple header corruption.  If the header is too
-	 * corrupt to continue checking, or if the tuple is not visible to anyone,
-	 * we cannot continue with other checks.
+	 * corrupt to continue checking, we cannot continue with other checks.
 	 */
 	if (!check_tuple_header(ctx))
 		return;
 
+	/*
+	 * Check tuple visibility.  If the inserting transaction aborted, we
+	 * cannot assume our relation description matches the tuple structure, and
+	 * therefore cannot check it.
+	 */
 	if (!check_tuple_visibility(ctx))
 		return;
 
@@ -1544,6 +1680,10 @@ check_tuple(HeapCheckContext *ctx)
 	 * next, at which point we abort further attribute checks for this tuple.
 	 * Note that we don't abort for all types of corruption, only for those
 	 * types where we don't know how to continue.
+	 *
+	 * While checking the tuple attributes, we build a list of toast pointers
+	 * we encounter, to be checked later.  If further attribute checking is
+	 * aborted, we still have the pointers collected prior to aborting.
 	 */
 	ctx->offset = 0;
 	for (ctx->attnum = 0; ctx->attnum < ctx->natts; ctx->attnum++)
diff --git a/src/bin/pg_amcheck/t/004_verify_heapam.pl b/src/bin/pg_amcheck/t/004_verify_heapam.pl
index 36607596b1..33e5de51bf 100644
--- a/src/bin/pg_amcheck/t/004_verify_heapam.pl
+++ b/src/bin/pg_amcheck/t/004_verify_heapam.pl
@@ -224,7 +224,7 @@ my $rel = $node->safe_psql('postgres', qq(SELECT pg_relation_filepath('public.te
 my $relpath = "$pgdata/$rel";
 
 # Insert data and freeze public.test
-use constant ROWCOUNT => 16;
+use constant ROWCOUNT => 21;
 $node->safe_psql('postgres', qq(
 	INSERT INTO public.test (a, b, c)
 		VALUES (
@@ -259,6 +259,13 @@ select lp_off from heap_page_items(get_raw_page('test', 'main', 0))
 	offset $tup limit 1)));
 }
 
+# Find our toast relation id
+my $toastrelid = $node->safe_psql('postgres', qq(
+	SELECT c.reltoastrelid
+		FROM pg_catalog.pg_class c
+		WHERE c.oid = 'public.test'::regclass
+		));
+
 # Sanity check that our 'test' table on disk layout matches expectations.  If
 # this is not so, we will have to skip the test until somebody updates the test
 # to work on this platform.
@@ -296,7 +303,7 @@ close($file)
 $node->start;
 
 # Ok, Xids and page layout look ok.  We can run corruption tests.
-plan tests => 19;
+plan tests => 24;
 
 # Check that pg_amcheck runs against the uncorrupted table without error.
 $node->command_ok(['pg_amcheck', '-p', $port, 'postgres'],
@@ -310,6 +317,7 @@ $node->stop;
 
 # Some #define constants from access/htup_details.h for use while corrupting.
 use constant HEAP_HASNULL            => 0x0001;
+use constant HEAP_HASEXTERNAL        => 0x0004;
 use constant HEAP_XMAX_LOCK_ONLY     => 0x0080;
 use constant HEAP_XMIN_COMMITTED     => 0x0100;
 use constant HEAP_XMIN_INVALID       => 0x0200;
@@ -362,7 +370,7 @@ for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
 		push @expected,
 			qr/${header}xmin $xmin precedes relation freeze threshold 0:\d+/;
 	}
-	if ($offnum == 2)
+	elsif ($offnum == 2)
 	{
 		# Corruptly set xmin < datfrozenxid
 		my $xmin = 3;
@@ -480,7 +488,7 @@ for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
 
 		$header = header(0, $offnum, 1);
 		push @expected,
-			qr/${header}attribute \d+ with length \d+ ends at offset \d+ beyond total tuple length \d+/;
+			qr/${header}attribute with length \d+ ends at offset \d+ beyond total tuple length \d+/;
 	}
 	elsif ($offnum == 13)
 	{
@@ -489,9 +497,18 @@ for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
 
 		$header = header(0, $offnum, 2);
 		push @expected,
-			qr/${header}toasted value for attribute 2 missing from toast table/;
+			qr/${header}toast value \d+ not found in toast table/;
 	}
 	elsif ($offnum == 14)
+	{
+		# Corrupt infomask to claim there are no external attributes, which conflicts
+		# with column 'c' which is toasted
+		$tup->{t_infomask} &= ~HEAP_HASEXTERNAL;
+		$header = header(0, $offnum, 2);
+		push @expected,
+			qr/${header}toast value \d+ is external but tuple header flag HEAP_HASEXTERNAL not set/;
+	}
+	elsif ($offnum == 15)
 	{
 		# Set both HEAP_XMAX_COMMITTED and HEAP_XMAX_IS_MULTI
 		$tup->{t_infomask} |= HEAP_XMAX_COMMITTED;
@@ -501,7 +518,7 @@ for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
 		push @expected,
 			qr/${header}multitransaction ID 4 equals or exceeds next valid multitransaction ID 1/;
 	}
-	elsif ($offnum == 15)	# Last offnum must equal ROWCOUNT
+	elsif ($offnum == 16)
 	{
 		# Set both HEAP_XMAX_COMMITTED and HEAP_XMAX_IS_MULTI
 		$tup->{t_infomask} |= HEAP_XMAX_COMMITTED;
@@ -511,6 +528,43 @@ for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
 		push @expected,
 			qr/${header}multitransaction ID 4000000000 precedes relation minimum multitransaction ID threshold 1/;
 	}
+	elsif ($offnum == 17)
+	{
+		# Corrupt column c's toast pointer va_vartag field
+		$tup->{c_va_vartag} = 42;
+		$header = header(0, $offnum, 2);
+		push @expected,
+			qr/${header}toasted attribute has unexpected TOAST tag 42/;
+	}
+	elsif ($offnum == 18)
+	{
+		# Corrupt column c's toast pointer va_extinfo field
+		$tup->{c_va_extinfo} = 7654321;
+		$header = header(0, $offnum, 2);
+		push @expected,
+			qr/${header}toast value \d+ external size 7654321 exceeds maximum expected for rawsize 10004/;
+	}
+	elsif ($offnum == 19)
+	{
+		# Corrupt column c's toast pointer va_valueid field.  We have not
+		# consumed enough oids for any valueid in the toast table to be large.
+		# Use a large oid for the corruption to avoid colliding with an
+		# existent entry in the toast.
+		my $corrupt = $tup->{c_va_valueid} + 100000000;
+		$tup->{c_va_valueid} = $corrupt;
+		$header = header(0, $offnum, 2);
+		push @expected,
+			qr/${header}toast value \d+ not found in toast table/;
+	}
+	elsif ($offnum == 20)	# Last offnum must less than or equal to ROWCOUNT-1
+	{
+		# Corrupt column c's toast pointer va_toastrelid field
+		my $otherid = $toastrelid + 1;
+		$tup->{c_va_toastrelid} = $otherid;
+		$header = header(0, $offnum, 2);
+		push @expected,
+			qr/${header}toast value \d+ toast relation oid $otherid differs from expected oid $toastrelid/;
+	}
 	write_tuple($file, $offset, $tup);
 }
 close($file)
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 9e6777e9d0..0ce261e2a2 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2557,6 +2557,7 @@ TimestampTz
 TmFromChar
 TmToChar
 ToastAttrInfo
+ToastCheckContext
 ToastTupleContext
 TocEntry
 TokenAuxData
-- 
2.21.1 (Apple Git-122.3)

